Utility Class
Semaphore
java.util.concurrent.Semaphore是对信号量模型的实现。
用于控制同时访问的线程个数。
底层基于 AQS,state 变量存储的是可用的剩余资源。
/**
* 5台机器,8个工人,一台机器只能被一个工人使用。
*/
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}CountDownLatch
主要用来解决一个线程等待多个线程的场景。
计数器不能循环利用。当计数器减到0时,再调用 await 会直接通过。
底层用 AQS 实现,state 变量即为设置的 N。
CyclicBarrier
CyclicBarrier 是一组线程之间互相等待。
计数器可以循环利用,而且能够自动重置,一旦计数器减到0,会自动重置到你设置的初始值。
ThreadPoolExecutor
Java 创建线程需要调用操作系统内核 API,然后操作系统分配一系列资源,所以创建线程是一个重量级操作,应该避免频繁创建于销毁。所以需要线程池来解决上述问题。
构造函数
corePoolSize:最小线程数。
maximumPoolSize:最大线程数。
keepAliveTiTime & unit:若一个线程空闲了一段时间,并且线程数量大于 corePoolSize,那么空闲的线程就能被回收。
workQueue:阻塞的工作队列。
threadFactory:自定义如何创建线程,可以指定有意义的线程名字
handler:工作队列满了后,提交任务时的拒绝策略。
CallerRunsPolicy:提交任务的线程自己去执行。
AbortPolicy:默认,直接拒绝,抛出 RejectedExecutionException。
DiscardPolicy:直接丢弃。
DiscardOldestPolicy:丢弃最老的任务。
默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
核心逻辑
ThreadPoolExecutor 有个内部类 Worker,轮询去任务队列拿任务执行。
submit
ThreadPoolExecutor 的 execute() 方法不能取得任务结果,但是 submit() 方法可以。
当使用 submit 方法时,若提交的任务抛出了异常,只能通过返回的 Future 获取异常。所以如果即使用 submit 方法,又没有保留返回的 Future,则必须保证提交的任务没有抛出异常,不然异常会被吃掉,不会打印出异常栈。
原因:使用 submit 后,会把任务包装成 FutureTask,在执行时会捕获异常,保存在 Future 里面,自然就不会再抛出异常了;而使用 execute 时,任务抛出的异常会直接再次抛出。
所以:若不需要返回结果,建议使用 execute 方法。
Executors
Java 并发包提供了 Executors 可以快速创建线程池。
目前大厂的编码规范中基本上都不建议使用 Executors 了。因为 Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue。
Future
Future 接口有如下方法:
FutureTask
FutureTask实现了Runnable和Future 接口。所以 FutureTask 可以很容易获取线程的执行结果。
以华罗庚的烧水泡茶为例,可以有很多方法解决,比如 Thread.join()、CountDownLatch 等。用 FutureTask 来解决的方法如下:

CompletableFuture
概念
异步化:是利用多线程优化性能这个核心方案得以实施的基础。
Java 8 提供了 CompletableFuture来支持异步编程,Java 9 提供了更加完备的 Flow API,ReactiveX 的 Java 实现是 RxJava,使得在 Java 6就能使用异步编程。
重新实现一遍上一节的烧水泡茶:

创建 CompletableFuture
主要使用下面四种静态方法。
可以指定线程池,若不指定,则使用公用的
ForkJoinPool线程池。ForkJoinPool 默认线程数为 CPU 核数,可以通过参数
-Djava.util.concurrent.ForkJoinPool.common.parallelism设置。若是 IO 较重的异步操作,建议使用自定义线程池。
runAsync 使用 Runnable 接口,没有返回值。
supplyAsync 使用 Supplier 接口,有返回值。
CompletionStage
CompletableFuture 实现了 CompletionStage 接口。CompletionStage 定义了工作流的关系,工作流的关系主要有:串行关系、并行关系、汇聚关系。汇聚关系又分为:
AND 汇聚:所有任务依赖都完成之后才开始执行当前任务。
OR 汇聚:依赖的任务只要有一个完成就可以执行当前任务。
CompletionStage 除了定义工作流的关系之外,还要能够处理异常。
串行关系
thenApply(Function):既能接受参数,也支持返回值。
thenAccept(Consumer):能接受参数,不支持返回值。
thenRun(Runnable):不能接受参数,也不支持返回值。
AND 汇聚关系
三者的差别与上文 Function、Consumer、Runnable 的差别一致。
OR 汇聚关系
三者的差别与上文 Function、Consumer、Runnable 的差别一致。
异常处理
exceptionally:相当于catch{}代码块。whenComplete:相当于finally{}代码块,无论异常是否发生都会执行,不支持返回结果。handleAsync:相当于finally{}代码块,支持返回结果,
CompletionService
实现原理:内部维护一个阻塞队列,任务执行结束就把执行结果加入到阻塞队列。
实现类是 ExecutorCompletionService:
CompletionService 有 5 个方法:
Last updated
Was this helpful?
