# Division

并发编程领域的核心问题可分为：[分工、同步、互斥](/notes/java/concurrency/preface.md#fen-gong-tong-bu-hu-chi)。同步和互斥问题多源自微观，而分工往往来自宏观。

解决分工问题也有一系列的设计模式，常用的有 Thread-Per-Message、Work Thread、生产者-消费者。

## Thread-Per-Message

简单来说，就是为每个任务分配一个独立的线程去处理。

比如一个 Http Server，在主线程中只接受请求，处理请求通过创建一个子线程处理，不然整个服务同一时间只能处理一个请求。

### Thread 实现

Thread-Per-Message 在网络编程的服务端使用非常广泛，最简单的服务端就是 echo 服务，即客户端发送什么，服务端就响应相同的内容。

```java
public static void main(String[] args) throws IOException {
    try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
        try {
            while (true) {
                SocketChannel sc = ssc.accept();
                new Thread(() -> {
                    try {
                        ByteBuffer rb = ByteBuffer.allocate(2048);
                        sc.read(rb);
                        rb.flip();
                        sc.write(rb);
                        sc.close();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }).start();
            }
        } finally {
            ssc.close();
        }
    }
}
```

### Fiber 实现

上面 Thread 实现有一个非常严重的问题，原因是 Java 的线程是很重量的，Java 的线程对应操作系统的线程，优点是技术很成熟，缺点是创建成本高。

所以 Thread-Per-Message 这种设计模式对 Java 语言确实不友好，但是其它语言就不一样了，比如 Go、Lua 有轻量级线程的概念，Kotlin 也有 coroutine。

Java 也有一个 Loom 项目，解决轻量级线程的问题，在 Loom 项目中，轻量级线程叫做 Fiber。

```java
public static void main(String[] args) throws IOException {
    try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
        try {
            while (true) {
                SocketChannel sc = ssc.accept();
                Fiber.schedule(() -> {
                    try {
                        ByteBuffer rb = ByteBuffer.allocate(2048);
                        sc.read(rb);
                        rb.flip();
                        sc.write(rb);
                        sc.close();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
            }
        } finally {
            ssc.close();
        }
    }
}
```

Java 还有一个开源的协程库叫 [Quasar](https://github.com/puniverse/quasar)。

## Work Thread

上一节的 Thread-Per-Message 模式不适合高并发的场景，原因在于频繁的创建、销毁现场非常消耗性能。可以使用 **Work-Thread** 模式来避免上述问题，原理就是用阻塞队列做任务池，创建固定数量的线程来消费任务。可以看出，这就是常见的**线程池**的模式。

```java
ExecutorService es = Executors.newFixedThreadPool(500);
try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
    try {
        while (true) {
            SocketChannel sc = ssc.accept();
            es.execute(() -> {
                try {
                    ByteBuffer rb = ByteBuffer.allocate(2048);
                    sc.read(rb);
                    rb.flip();
                    sc.write(rb);
                    sc.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
        }
    } finally {
        ssc.close();
        es.shutdown();
    }
}
```

### 创建线程池的建议

* 使用**有界队列**来接受任务。
* 清晰地指明拒绝策略。
* 给线程赋予一个业务相关的名字。

### 避免线程死锁

线程池有一种线程死锁的场景：如果提交到线程池的任务有依赖关系，那么有可能导致线程死锁。如下例子：

```java
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(() -> {
    try {
        String qq = es.submit(() -> "QQ").get();
        System.out.println(qq);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
});
```

下面例子，线程全部阻塞在 l2.await()：

```java
ExecutorService es = Executors.newFixedThreadPool(2);
CountDownLatch l1 = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    es.execute(() -> {
        CountDownLatch l2 = new CountDownLatch(2);
        for (int j = 0; j < 2; j++) {
            es.execute(l2::countDown);
        }
        try {
            l2.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        l1.countDown();
    });
}
l1.await();
es.shutdown();
```

解决办法：

1. 增大线程池数量，但是无法确定任务数量或任务数量很多，此方法不可用。
2. 为不同的任务创建各自的线程池。

{% hint style="warning" %}
提交到线程池的任务一定要相互独立！
{% endhint %}

## 生产者-消费者

**原理**：一组**生产者线程**负责生产任务，把任务放入**任务队列**，一组**消费者线程**从任务队列中获取任务执行。

Java 的线程池即是生产者-消费者模式。

优点：

* **解耦**：生产者和消费者没有依赖关系。
* **异步**：生产者无需等待任务执行完成。
* **平衡生产者和消费者的速度**：如果生产者和消费者的速度是 1:3，则消费者只需要 1/3 的线程数量，减少开销。
* **支持批量执行**：比如生产者生产的每个任务都是往数据库中插入一条数据，那么消费者可以一次性从任务队列中取出多个任务，然后组合成一个批量插入的 SQL 语句，这样就大量减少了对数据库的写入操作。

```java
private BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);

public void start() {
    ExecutorService es = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
        es.execute(() -> {
            while (true) {
                try {
                    List<Task> tasks = pollTasks();
                    // 执行任务
                } catch (InterruptedException e) { }
            }
        });
    }
}

private List<Task> pollTasks() throws InterruptedException {
    List<Task> ts = new LinkedList<>();
    Task t = bq.take();
    while (t != null) {
        ts.add(t);
        t = bq.poll();
    }
    return ts;
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yunzhao.gitbook.io/notes/java/concurrency/concurrency-design-patterns/division.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
