Division

并发编程领域的核心问题可分为:分工、同步、互斥。同步和互斥问题多源自微观,而分工往往来自宏观。

解决分工问题也有一系列的设计模式,常用的有 Thread-Per-Message、Work Thread、生产者-消费者。

Thread-Per-Message

简单来说,就是为每个任务分配一个独立的线程去处理。

比如一个 Http Server,在主线程中只接受请求,处理请求通过创建一个子线程处理,不然整个服务同一时间只能处理一个请求。

Thread 实现

Thread-Per-Message 在网络编程的服务端使用非常广泛,最简单的服务端就是 echo 服务,即客户端发送什么,服务端就响应相同的内容。

public static void main(String[] args) throws IOException {
    try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
        try {
            while (true) {
                SocketChannel sc = ssc.accept();
                new Thread(() -> {
                    try {
                        ByteBuffer rb = ByteBuffer.allocate(2048);
                        sc.read(rb);
                        rb.flip();
                        sc.write(rb);
                        sc.close();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }).start();
            }
        } finally {
            ssc.close();
        }
    }
}

Fiber 实现

上面 Thread 实现有一个非常严重的问题,原因是 Java 的线程是很重量的,Java 的线程对应操作系统的线程,优点是技术很成熟,缺点是创建成本高。

所以 Thread-Per-Message 这种设计模式对 Java 语言确实不友好,但是其它语言就不一样了,比如 Go、Lua 有轻量级线程的概念,Kotlin 也有 coroutine。

Java 也有一个 Loom 项目,解决轻量级线程的问题,在 Loom 项目中,轻量级线程叫做 Fiber。

public static void main(String[] args) throws IOException {
    try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
        try {
            while (true) {
                SocketChannel sc = ssc.accept();
                Fiber.schedule(() -> {
                    try {
                        ByteBuffer rb = ByteBuffer.allocate(2048);
                        sc.read(rb);
                        rb.flip();
                        sc.write(rb);
                        sc.close();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
            }
        } finally {
            ssc.close();
        }
    }
}

Java 还有一个开源的协程库叫 Quasar

Work Thread

上一节的 Thread-Per-Message 模式不适合高并发的场景,原因在于频繁的创建、销毁现场非常消耗性能。可以使用 Work-Thread 模式来避免上述问题,原理就是用阻塞队列做任务池,创建固定数量的线程来消费任务。可以看出,这就是常见的线程池的模式。

ExecutorService es = Executors.newFixedThreadPool(500);
try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
    try {
        while (true) {
            SocketChannel sc = ssc.accept();
            es.execute(() -> {
                try {
                    ByteBuffer rb = ByteBuffer.allocate(2048);
                    sc.read(rb);
                    rb.flip();
                    sc.write(rb);
                    sc.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
        }
    } finally {
        ssc.close();
        es.shutdown();
    }
}

创建线程池的建议

  • 使用有界队列来接受任务。

  • 清晰地指明拒绝策略。

  • 给线程赋予一个业务相关的名字。

避免线程死锁

线程池有一种线程死锁的场景:如果提交到线程池的任务有依赖关系,那么有可能导致线程死锁。如下例子:

ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(() -> {
    try {
        String qq = es.submit(() -> "QQ").get();
        System.out.println(qq);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
});

下面例子,线程全部阻塞在 l2.await():

ExecutorService es = Executors.newFixedThreadPool(2);
CountDownLatch l1 = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    es.execute(() -> {
        CountDownLatch l2 = new CountDownLatch(2);
        for (int j = 0; j < 2; j++) {
            es.execute(l2::countDown);
        }
        try {
            l2.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        l1.countDown();
    });
}
l1.await();
es.shutdown();

解决办法:

  1. 增大线程池数量,但是无法确定任务数量或任务数量很多,此方法不可用。

  2. 为不同的任务创建各自的线程池。

提交到线程池的任务一定要相互独立!

生产者-消费者

原理:一组生产者线程负责生产任务,把任务放入任务队列,一组消费者线程从任务队列中获取任务执行。

Java 的线程池即是生产者-消费者模式。

优点:

  • 解耦:生产者和消费者没有依赖关系。

  • 异步:生产者无需等待任务执行完成。

  • 平衡生产者和消费者的速度:如果生产者和消费者的速度是 1:3,则消费者只需要 1/3 的线程数量,减少开销。

  • 支持批量执行:比如生产者生产的每个任务都是往数据库中插入一条数据,那么消费者可以一次性从任务队列中取出多个任务,然后组合成一个批量插入的 SQL 语句,这样就大量减少了对数据库的写入操作。

private BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);

public void start() {
    ExecutorService es = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
        es.execute(() -> {
            while (true) {
                try {
                    List<Task> tasks = pollTasks();
                    // 执行任务
                } catch (InterruptedException e) { }
            }
        });
    }
}

private List<Task> pollTasks() throws InterruptedException {
    List<Task> ts = new LinkedList<>();
    Task t = bq.take();
    while (t != null) {
        ts.add(t);
        t = bq.poll();
    }
    return ts;
}

Last updated