Notes
Search…
⌃K

Fork/Join

分治任务

从上到下依次为简单并行任务、聚合任务、批量并行任务
上述三种情况可以覆盖日常生活中的大多数并发场景,但是还有一种任务模型,分治
分治指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,知道子问题简单到可以直接求解。比如归并排序、快速排序、二分查找、MapReduce。
分治任务模型有两个阶段:
  • 任务分解:任何和分解后的子任务具有相似性,体现在解决算法是相同的,只是数据规模不同。一般采用递归算法。
  • 结果合并
Java 并发包提供了 Fork/Join 的并行框架来解决分治的任务模型。

Fork/Join 框架

Fork 对应任务的分解,Join 对应结果的合并。这个框架主要有两部分:
  • ForkJoinPool:工作的线程池。
  • ForkJoinTask:分治任务。
两者的关系可以类比为 ThreadPoolExecutor 和 Runnable。

ForkJoinTask

ForkJoinTask 类继承图
下面部分源码(Java 1.8):
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
protected abstract V compute();
}
public abstract class RecursiveAction extends ForkJoinTask<Void> {
protected abstract void compute();
}
  • fork() 方法会异步地执行一个子任务。
  • join() 方法会阻塞当前线程来等待子任务的执行结果。
  • 两个子类都是抽象类,在使用的时候需要实现 compute() 方法。
  • 根据子类的名称就可看出使用递归的方式来处理任务。

ForkJoinPool

ForkJoinPool 与 ThreadPoolExecutor 一样,本质上是一个生产者-消费者模型,只是更加智能。ThreadPoolExecutor 只有一个任务队列,而 ForkJoinPool 有多个任务队列。使用 invoke() 或 submit() 方法时,会根据一定的路由规则把任务给到其中的一个队列。任务在执行过程中创建的子任务会提交到工作线程对应的任务队列。
任务窃取:如果工作线程空闲了,那么它会窃取其它工作任务队列里面的任务。任务队列采用双端队列,工作线程正常获取任务和窃取任务是从不同的端拿的,以减少数据竞争。
ForkJoinPool 工作原理
Java 1.8 中的 parallelStream 也是以 ForkJoinPool 为基础的,默认情况下所有的并行流都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 线程数是 CPU 的核数。如果并行流是计算密集型的话没什么问题;若是 IO 密集型的话,很容易拖慢整个系统的性能。建议用不同的 ForkJoinPool 执行不同的计算任务。

案例

斐波那契数列

public static void main(String[] args) {
System.out.println(new ForkJoinPool(4).invoke(new FibonacciTask(30)));
}
private static final class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
private FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
return new FibonacciTask(n - 1).compute() + new FibonacciTask(n - 2).fork().join();
}
}