Notes
  • Introduce
  • Go
    • Grammar
      • Basic
      • Goroutines & Channels
      • Test
    • System Library
      • Module
      • sync
      • context
      • net
    • Concurrency in Go
    • The Go Memory Model
    • Code Snippet
  • Rust
    • The Rust Programming Language
    • Rust by Example
  • JAVA
    • Preface
    • Grammar
      • Basic
      • Data Types
      • Operator
      • Exceptions
    • Class Libraries
      • Collection
      • Stream
      • IO
      • NIO
      • RMI
    • Concurrency
      • Preface
      • JMM
      • Synchronized & CAS
      • Deadlock
      • Thread
      • Lock & Condition
      • Utility Class
      • Thread-safe Collection
      • Atomic Class
      • Fork/Join
      • Concurrency Design Patterns
        • Immutable
        • Copy-on-Write
        • ThreadLocal
        • Multitheading If
        • Division
    • JVM
      • Class & Instance Initialization
      • Runtime Data Area
      • Garbage Collection
    • Web Container
      • Tomcat Architecture
      • Jetty Architecture
    • Spring
    • Tuning
      • Programming
  • Computer Science
    • Computer Organization
    • Algorithm
      • Complexity
      • Linear List
      • Sort
      • Binary Search
      • Skip List
      • Hash Table
      • Tree
      • Graph
      • String Matching
      • Bloom Filter
      • Greedy Algorithm
      • Divide and Conquer
      • Back Tracking
      • Dynamic Programming
    • Network Protocol
      • Pysical Layer
      • Data Link Layer
      • Network Layer
      • Transport Layer
      • Application layer
      • HTTP
      • HTTP/2 in Action
    • Operating System
      • Basic
      • System Initialization
      • Diagnostic Tools
      • CPU Diagnosis
      • Memory Diagnosis
      • Disk Diagnosis
      • Network Diagnosis
      • Monitor System
    • Design Patterns
      • UML
      • OOP
      • Principle
      • Refactoring & Specification
      • Creational
        • Singleton
        • Factory
        • Builder
        • Prototype
      • Structural
        • Proxy
        • Bridge
        • Decorator
        • Adapter
        • Facade
        • Composite
        • FlyWeight
      • Behavioral
        • Observer
        • Template Method
        • Strategy
        • State
        • Iterator
        • Chain of Responsibility
    • Distributed System
      • Protocol & Algorithm
      • Transcation
      • Theory
      • Resource Management
      • Scheduling
      • Computing
      • Message Queue
      • Cache
      • Consistent Hashing
  • database
    • InfluxDB
      • In-Memory Index
      • Meta
    • MySQL
      • SQL
      • Architecture
      • Log
      • Transaction
      • Indexing
      • Lock
      • Storage
    • Redis
    • Elasticsearch
      • Local Debug
    • HBase
    • Kafka
    • ZooKeeper
  • Reading
    • RocketMQ
    • 演说之禅
    • So Good They Can't Ignore You
    • 学会提问
    • Lecture
  • Other
    • v2ray
    • Kubernetes
    • Git
    • Maven
    • Anaconda And Conda
    • Fuck! Shit!
      • Remove Final by Reflection
      • Ingress Host
      • ExecuterService submit
  • Open source contribution
Powered by GitBook
On this page
  • Guarded Suspension
  • 常规的 Guarded Suspension 模式
  • 扩展 Guarded Suspension 模式
  • Balking

Was this helpful?

  1. JAVA
  2. Concurrency
  3. Concurrency Design Patterns

Multitheading If

PreviousThreadLocalNextDivision

Last updated 5 years ago

Was this helpful?

Guarded Suspension

为了效率,现在异步调用使用越来越广泛。异步调用需要解决两个问题:

  1. 线程 T1 发送异步请求后,怎么等待结果返回?

  2. 多个线程发送异步请求,多个结果返回后,怎么对应到原来的请求?

常规的 Guarded Suspension 模式

对于问题 1,假设服务 A 异步调用服务 B,服务 A 发送请求的线程是 T1,但是接受服务 B 返回结果的线程是 T2,那么 T1 怎么等待返回结果呢?

使用 Guarded Suspension 模式可以解决上述问题,使用一个 GuardedObject 对象,内部有一个成员变量,成员方法 get(Predicate<T> p) 中的 p 就是 T1 用来检验结果是否已经返回,成员方法 onChanged() 就是 T2 获取到返回结果后调用。

public final class GuardedObject<T> {
    private String obj;

    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    private static final int TIMEOUT = 1;

    public String get(Predicate<String> p) {
        lock.lock();
        try {
            while (!p.test(obj)) {
                done.await(TIMEOUT, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
        return obj;
    }

    public void onChanged(String obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 服务 A 发送请求给服务 B,然后等待请求返回
 */
public final class Sender {
    public void send() {
        String req = "request";
        System.out.println("Service A send request to service B: " + req);

        GuardedObject go = new GuardedObject<>();
        go.get(Objects::nonNull);
    }
}

/**
 * 服务 A 另外一个线程收到服务 B 的返回后,把结果反映到 GuardedObject
 */
public final class Receiver {
    public void handle(String resp) {
        GuardedObject go = ???
        go.onChanged(resp);
    }
}

扩展 Guarded Suspension 模式

Guarded Suspension 模式有一个问题,由于 Sender 会发送多条个请求,在 Receiver 中,怎么对应上 Sender 中的 GuardedObject 呢?我们可以维护一个请求 Id 与 GuardedObject 的关系表,即扩展的 Guarded Suspension 实现。

public final class GuardedObject {
    private String obj;

    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    private static final int TIMEOUT = 1;

    private final static Map<String, GuardedObject> gos = new ConcurrentHashMap<>();

    public static GuardedObject create(String key) {
        GuardedObject go = new GuardedObject();
        gos.put(key, go);
        return go;
    }

    public static  void handle(String key, String obj) {
        GuardedObject go = gos.remove(key);
        if (go != null) {
            go.onChanged(obj);
        }
    }

    public String get(Predicate<String> p) {
        lock.lock();
        try {
            while (!p.test(obj)) {
                done.await(TIMEOUT, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
        return obj;
    }

    private void onChanged(String obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 服务 A 发送请求给服务 B,然后等待请求返回
 */
public final class Sender {
    public void send() {
        String req = "request", id ="id";
        System.out.println("Service A send request to service B: " + req);

        GuardedObject go = GuardedObject.create(id);
        go.get(Objects::nonNull);
    }
}

/**
 * 服务 A 另外一个线程收到服务 B 的返回后,把结果反映到 GuardedObject
 */
public final class Receiver {
    public void handle(String id, String resp) {
        GuardedObject.handle(id, resp);
    }
}

Balking

Guarded Suspension 模式本质是有一个状态变量,一个线程会判断这个状态变量是否已经满足要求,若没有满足,则等待直到满足再进行接下来的逻辑。

Balking 模式的本质有点相似,同样有一个状态变量,一个线程会判断这个状态变量是否已经满足要求,若没有满足,则直接返回;若满足,则执行接下来的逻辑。与 Guarded Suspension 模式的差别就在于状态变量没有满足要求时,是直接返回还是等待直到满足要求。

所以 Guarded Suspension 模式通常也被称为“多线程版本的 if”,只是这个 if 是需要等待的,而且很执着。Balking 也是多线程版本的 if,这个 if 不需要等待。

以下是一个自动存盘的逻辑,采用了 Balking 模式:

public final class AutoSave {
    private boolean changed = false;

    private ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();

    private void startAutoSave() {
        ses.scheduleWithFixedDelay(this::autoSave, 5, 5, TimeUnit.SECONDS);
    }

    private void autoSave() {
        synchronized (this) {
            if (!changed) {
                return;
            }
            changed = false;
        }

        execSave();
    }

    private void execSave() {
        System.out.println("save");
    }

    public void edit() {
        System.out.println("edit");

        change();
    }

    private void change() {
        synchronized (this) {
            changed = true;
        }
    }
}

本质也是采用了 Balking 模式。

单例模式的双重检测方式