自定义stream collectors

自定义Stream的collectors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

import lombok.ToString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* 自定义Stream流收集器,以实现 Collectors.groupBy 功能
*/
public class StreamCollectors {

private static final Logger log = LoggerFactory.getLogger(StreamCollectors.class);

public static void main(String[] args) {
List<Device> list = new ArrayList() {
{
Device e = new Device();
e.setName("ss");
e.setPid("1");
System.out.println(e.getPid() + "--hashcode = " + System.identityHashCode(e));
add(e);
}

{
Device e = new Device();
e.setName("eeee");
e.setPid("2");
System.out.println(e.getPid() + "--hashcode = " + System.identityHashCode(e));

add(e);
}

{
Device e = new Device();
e.setName("eeee");
e.setPid("3");
System.out.println(e.getPid() + "--hashcode = " + System.identityHashCode(e));

add(e);
}

{
Device e = new Device();
e.setName("eeee");
e.setPid("4");
System.out.println(e.getPid() + "--hashcode = " + System.identityHashCode(e));
add(e);
}
};

HashMap<String, List<Device>> collect = list.stream().parallel().collect(new Supplier<HashMap<String, List<Device>>>() {
@Override
public HashMap<String, List<Device>> get() {
int i = new Random().nextInt(10000);
HashMap<String, List<Device>> collect = new HashMap<>();
System.out.println("初始对象map的hashcode = " + System.identityHashCode(collect) + "--------" + i);
return collect;
}
}, new BiConsumer<HashMap<String, List<Device>>, Device>() {
@Override
public void accept(HashMap<String, List<Device>> accept, Device provider) {
// 根据hashcode可以看出,都是之前的对象,传递到了这里
System.out.println("当前map的hashcode = " + System.identityHashCode(accept));
System.out.println("当前对象的hashcode = " + System.identityHashCode(provider));
String pid = provider.getPid();
if (accept.containsKey(pid)) {
List<Device> demos1 = accept.get(pid);
demos1.add(provider);
} else {
List<Device> list = new ArrayList<>();
list.add(provider);
accept.put(provider.getPid(), list);
}
}
}, new BiConsumer<HashMap<String, List<Device>>, HashMap<String, List<Device>>>() {
@Override
public void accept(HashMap<String, List<Device>> accept, HashMap<String, List<Device>> provider) {
// 第一个参数是累加器
log.info("只有在实现 parallel() 的时候才会调用");
log.info("第一个参数是累加器");
int i = new Random().nextInt(10000);
System.out.println("accept = " + accept + "--------" + i);
System.out.println("provider = " + provider + "--------" + i);
System.out.println("System.identityHashCode(accept) = " + System.identityHashCode(accept) + "--------" + i);
System.out.println("System.identityHashCode(provider) = " + System.identityHashCode(provider) + "--------" + i);
accept.putAll(provider);
// provider.putAll(accept);
}
});
System.out.println("result" + collect);

System.out.println("--------官方功能实现-------");
Map<String, List<Device>> collect1 = list.stream().collect(Collectors.groupingBy(Device::getPid));
System.out.println(collect1);
}


}

@ToString
class Device {
private String pid;
private String name;

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}