Java Fork/Join框架

  分而治之,一直是一个非常有效的处理大量数据的的方法。著名的MapReduce也是采取了分而治之的思想。简单来说,就是如果要处理10000个数据,用单线程一个一个遍历处理数据,效率肯定是很低的。但如果这1万个数据,分成10分,交给10个线程并行处理(线程数取决于CPU数量),这样就可以大大的提高处理效率。Fork/Join正式这样一个专为并行计算的框架。

Fork/Join框架简介

  Fork就是把一个大任务切分成若干个子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
Fork/Join执行逻辑
  在JDK中,提供了ForkJoinPool的线程池,对于fork()方法并不是立即就开启线程,而是提交给ForkJoinPool线程池进行处理。由于线程池的优化,提交的任务和线程数并不是一对一的关系。在绝大多数情况下,一个物理线程实际上要处理多个逻辑任务。因此,每个线程必然需要拥有一个任务队列。在实际执行过程中,可能遇到这么一种情况:线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理,尽可能的达到平衡。
  与普通的线程池不同,使用Join操作让一个主任务等待它锁创建的子任务的完成,执行这个任务的线程称之为工作者线程。工作者线程寻找其他仍未被执行的任务,然后开始执行。通过这种方式,这些线程在运行时拥有所有的优点,进而提升应用程序的性能。
  为了达到这个目标,通过Fork/Join框架执行的任务有以下限制

  • 任务只能使用fork()和join()操作当作同步机制。如果使用其他的同步机制,工作者线程就不能执行其他任务,当然这些任务是在同步操作里时。比如,如果在Fork/Join框架中将一个任务休眠,正在穿行这个任务的工作者线程在休眠期内不能执行另一个任务。
  • 任务不能执行I/O操作,比如文件数据的读取和写入。
  • 任务不能抛出非运行时异常,必须在代码中晒出掉这些异常。

Fork/Join框架组成

Fork/Join框架的核心由下列两个类组成。

  • ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息
  • ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。

通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类。

  • RecursiveAction:用于任务没有返回结果的场景。
  • RecursiveTask:用户任务有返回结果的场景。

一个Fork/Join示例

  使用Fork/Join框架对1到100的数列求和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer>{
private static final long serialVersionUID = 1L;
private int start;
private int end;
private int [] data;
public CountTask(int start,int end,int [] data) {
this.start = start;
this.end = end;
this.data = data;
}
@Override
protected Integer compute() {
int gap = end - start;
if (gap == 1) {
return data[start] + data[end];
} else if (gap == 0) {
return data[start];
} else if (gap > 1) {
int mid = (start + end)/2;
CountTask leftTask = new CountTask(start, mid, data);
CountTask rightTask = new CountTask(mid + 1 , end, data);
leftTask.fork();
rightTask.fork();
int leftResult = leftTask.join();
int rightResult = rightTask.join();
return leftResult + rightResult;
}
return null;
}
}

  测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import org.junit.Test;
public class FokrJoinTest {
@Test
public void testCount() throws Exception {
ForkJoinPool pool = new ForkJoinPool(4);
int [] data = new int[100];
for (int i = 0; i < 100; i++) {
data[i] = i+1;
}
CountTask task = new CountTask(0, 99, data);
Future<Integer> result = pool.submit(task);
System.out.println(result.get());
}
}

  把数组分成左右两个子数组,左边的子数组执行fork()操作,右边也执行fork()操作,最后通过join()方法获取两个子数组的计算结果,加起来就是最终结果。

参考资料

  • 《实战Java高并发程序设计》
  • 《Java7并发编程实战手册》
  • 《Java程序员修炼之道》