Notes
  • Introduce
  • Go
    • Grammar
      • Basic
      • Goroutines & Channels
      • Test
    • System Library
      • Module
      • sync
      • context
      • net
    • Concurrency in Go
    • The Go Memory Model
    • Code Snippet
  • Rust
    • The Rust Programming Language
    • Rust by Example
  • JAVA
    • Preface
    • Grammar
      • Basic
      • Data Types
      • Operator
      • Exceptions
    • Class Libraries
      • Collection
      • Stream
      • IO
      • NIO
      • RMI
    • Concurrency
      • Preface
      • JMM
      • Synchronized & CAS
      • Deadlock
      • Thread
      • Lock & Condition
      • Utility Class
      • Thread-safe Collection
      • Atomic Class
      • Fork/Join
      • Concurrency Design Patterns
        • Immutable
        • Copy-on-Write
        • ThreadLocal
        • Multitheading If
        • Division
    • JVM
      • Class & Instance Initialization
      • Runtime Data Area
      • Garbage Collection
    • Web Container
      • Tomcat Architecture
      • Jetty Architecture
    • Spring
    • Tuning
      • Programming
  • Computer Science
    • Computer Organization
    • Algorithm
      • Complexity
      • Linear List
      • Sort
      • Binary Search
      • Skip List
      • Hash Table
      • Tree
      • Graph
      • String Matching
      • Bloom Filter
      • Greedy Algorithm
      • Divide and Conquer
      • Back Tracking
      • Dynamic Programming
    • Network Protocol
      • Pysical Layer
      • Data Link Layer
      • Network Layer
      • Transport Layer
      • Application layer
      • HTTP
      • HTTP/2 in Action
    • Operating System
      • Basic
      • System Initialization
      • Diagnostic Tools
      • CPU Diagnosis
      • Memory Diagnosis
      • Disk Diagnosis
      • Network Diagnosis
      • Monitor System
    • Design Patterns
      • UML
      • OOP
      • Principle
      • Refactoring & Specification
      • Creational
        • Singleton
        • Factory
        • Builder
        • Prototype
      • Structural
        • Proxy
        • Bridge
        • Decorator
        • Adapter
        • Facade
        • Composite
        • FlyWeight
      • Behavioral
        • Observer
        • Template Method
        • Strategy
        • State
        • Iterator
        • Chain of Responsibility
    • Distributed System
      • Protocol & Algorithm
      • Transcation
      • Theory
      • Resource Management
      • Scheduling
      • Computing
      • Message Queue
      • Cache
      • Consistent Hashing
  • database
    • InfluxDB
      • In-Memory Index
      • Meta
    • MySQL
      • SQL
      • Architecture
      • Log
      • Transaction
      • Indexing
      • Lock
      • Storage
    • Redis
    • Elasticsearch
      • Local Debug
    • HBase
    • Kafka
    • ZooKeeper
  • Reading
    • RocketMQ
    • 演说之禅
    • So Good They Can't Ignore You
    • 学会提问
    • Lecture
  • Other
    • v2ray
    • Kubernetes
    • Git
    • Maven
    • Anaconda And Conda
    • Fuck! Shit!
      • Remove Final by Reflection
      • Ingress Host
      • ExecuterService submit
  • Open source contribution
Powered by GitBook
On this page
  • 分治任务
  • Fork/Join 框架
  • ForkJoinTask
  • ForkJoinPool
  • 案例
  • 斐波那契数列

Was this helpful?

  1. JAVA
  2. Concurrency

Fork/Join

PreviousAtomic ClassNextConcurrency Design Patterns

Last updated 5 years ago

Was this helpful?

分治任务

  • + :可以解决简单的并行任务。

  • :可以解决有聚合关系的任务,AND 聚合或 OR 聚合。

  • :可以解决批量的并行任务。

上述三种情况可以覆盖日常生活中的大多数并发场景,但是还有一种任务模型,分治。

分治指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,知道子问题简单到可以直接求解。比如归并排序、快速排序、二分查找、MapReduce。

分治任务模型有两个阶段:

  • 任务分解:任何和分解后的子任务具有相似性,体现在解决算法是相同的,只是数据规模不同。一般采用递归算法。

  • 结果合并。

Java 并发包提供了 Fork/Join 的并行框架来解决分治的任务模型。

Fork/Join 框架

Fork 对应任务的分解,Join 对应结果的合并。这个框架主要有两部分:

  • ForkJoinPool:工作的线程池。

  • ForkJoinTask:分治任务。

两者的关系可以类比为 ThreadPoolExecutor 和 Runnable。

ForkJoinTask

下面部分源码(Java 1.8):

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
}

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    protected abstract V compute();
}

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    protected abstract void compute();
}
  • fork() 方法会异步地执行一个子任务。

  • join() 方法会阻塞当前线程来等待子任务的执行结果。

  • 两个子类都是抽象类,在使用的时候需要实现 compute() 方法。

  • 根据子类的名称就可看出使用递归的方式来处理任务。

ForkJoinPool

ForkJoinPool 与 ThreadPoolExecutor 一样,本质上是一个生产者-消费者模型,只是更加智能。ThreadPoolExecutor 只有一个任务队列,而 ForkJoinPool 有多个任务队列。使用 invoke() 或 submit() 方法时,会根据一定的路由规则把任务给到其中的一个队列。任务在执行过程中创建的子任务会提交到工作线程对应的任务队列。

任务窃取:如果工作线程空闲了,那么它会窃取其它工作任务队列里面的任务。任务队列采用双端队列,工作线程正常获取任务和窃取任务是从不同的端拿的,以减少数据竞争。

Java 1.8 中的 parallelStream 也是以 ForkJoinPool 为基础的,默认情况下所有的并行流都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 线程数是 CPU 的核数。如果并行流是计算密集型的话没什么问题;若是 IO 密集型的话,很容易拖慢整个系统的性能。建议用不同的 ForkJoinPool 执行不同的计算任务。

案例

斐波那契数列

public static void main(String[] args) {
    System.out.println(new ForkJoinPool(4).invoke(new FibonacciTask(30)));
}

private static final class FibonacciTask extends RecursiveTask<Integer> {
    private final int n;
    private FibonacciTask(int n) {
        this.n = n;
    }
    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        return new FibonacciTask(n - 1).compute() + new FibonacciTask(n - 2).fork().join();
    }
}
从上到下依次为简单并行任务、聚合任务、批量并行任务
ForkJoinTask 类继承图
ForkJoinPool 工作原理
线程池
Future
CompletableFuture
CompletionService