ForkJoinPool Example

ForkJoinPool Example

繼上一篇簡單介紹ForkJoinPool與一般的thread有哪些不同的特點,
這邊介紹幾個ForkJoinPool的使用範例。

自定義的ForkJoinPool,該範例執行效果等同於Callable的invokeAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ConcurrentMap<String, String> map1 = new ConcurrentHashMap<>();
ForkJoinPool customThreadPool = new ForkJoinPool(10);

int[] values1 = customThreadPool.submit(() -> IntStream.range(1, 1000000).parallel().filter(x -> {
map1.put(Thread.currentThread().toString(), String.valueOf(x));
return (x % 2 == 1);
}).toArray()).get();

System.out.println(map1.keySet());
System.out.println("thread size:" + map1.keySet().size() + ", array size:" + values1.length);
customThreadPool.shutdown();

Output:
[Thread[ForkJoinPool-1-worker-10,5,main], Thread[ForkJoinPool-1-worker-15,5,main], Thread[ForkJoinPool-1-worker-11,5,main], Thread[ForkJoinPool-1-worker-8,5,main], Thread[ForkJoinPool-1-worker-2,5,main], Thread[ForkJoinPool-1-worker-9,5,main], Thread[ForkJoinPool-1-worker-4,5,main], Thread[ForkJoinPool-1-worker-1,5,main], Thread[ForkJoinPool-1-worker-6,5,main], Thread[ForkJoinPool-1-worker-13,5,main]]
thread size:10, array size:500000

共用的ForkJoinPool,該範例執行效果等同於Callable的invokeAll

1
2
3
4
5
6
7
8
9
10
11
12
13
ConcurrentMap<String, String> map2 = new ConcurrentHashMap<>();

int[] values2 = IntStream.range(1, 1000000).parallel().filter(x -> {
map2.put(Thread.currentThread().toString(), String.valueOf(x));
return (x % 2 == 1);
}).toArray();

System.out.println(map2.keySet());
System.out.println("thread size:" + map2.keySet().size() + ", array size:" + values2.length);

Output:
[Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main]]
thread size:4, array size:500000

共用的ForkJoinPool,該範例執行效果等同Runnable的execute

1
2
3
4
5
6
7
8
9
10
11
12
ConcurrentMap<String, String> map3 = new ConcurrentHashMap<>();

IntStream.range(1, 1000000).parallel().forEach(x -> {
map3.put(Thread.currentThread().toString(), String.valueOf(x));
});

System.out.println(map3.keySet());
System.out.println("thread size:" + map3.keySet().size());

Output:
[Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main]]
thread size:4

自定義的ForkJoinPool,該範例執行執行效果等同Runnable的execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ForkJoinPool customThreadPool2 = new ForkJoinPool(10);
ConcurrentMap<String, String> map4 = new ConcurrentHashMap<>();

customThreadPool2.execute(() -> {
IntStream.range(1, 1000000).parallel().forEach(x -> {
map4.put(Thread.currentThread().toString(), String.valueOf(x));
});
});

// 測試程式過快結束,會導致沒有執行完成,因此等待3秒
Thread.sleep(3000);
System.out.println(map4.keySet());
System.out.println("thread size:" + map4.keySet().size());
customThreadPool2.shutdown();

Output:
[Thread[ForkJoinPool-1-worker-10,5,main], Thread[ForkJoinPool-1-worker-15,5,main], Thread[ForkJoinPool-1-worker-11,5,main], Thread[ForkJoinPool-1-worker-2,5,main], Thread[ForkJoinPool-1-worker-9,5,main], Thread[ForkJoinPool-1-worker-4,5,main], Thread[ForkJoinPool-1-worker-6,5,main], Thread[ForkJoinPool-1-worker-13,5,main]]
thread size:8