小言_互联网的博客

阿里P8级大佬详解并发编程里的设计模式之Guarded Suspension

254人阅读  评论(0)

假设有个文件浏览器系统,它依赖文件浏览服务,而文件浏览服务只支持MQ方式接入。
这样的接入方式,发消息和消费这俩操作之间就是异步的

  • MQ示意图

    用户通过浏览器发过来一个请求,会被转换成一个异步消息发送给MQ,等MQ返回结果后,再将这个结果返回至浏览器。
    给MQ发送消息的线程是处理Web请求的线程T1,但消费MQ结果的线程并不是线程T1,那线程T1如何等待MQ的返回结果呢?
class Message{
   
  String id;
  String content;
}
//该方法可以发送消息
void send(Message msg){
   
  //省略相关代码
}
//MQ消息返回后会调用该方法
//该方法的执行线程不同于
//发送消息的线程
void onMessage(Message msg){
   
  //省略相关代码
}
//处理浏览器发来的请求
Respond handleWebReq(){
   
  //创建一消息
  Message msg1 = new 
    Message("1","{...}");
  //发送消息
  send(msg1);
  //如何等待MQ返回的消息呢?
  String result = ...;
}

异步转同步问题。

Guarded Suspension模式

团建要外出聚餐,预订了一个包间,然后兴冲冲奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,告诉我们:“预订的包间服务员正在收拾,请稍等。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。

等待包间收拾完这个过程和小灰遇到的等待MQ返回消息本质上是一样的,都是等待一个条件满足:就餐需要等待包间收拾完,小灰的程序里要等待MQ返回消息。

现实如何解决这类问题呢?
现实里大堂经理很重要,我们是否等待,完全是由他来协调。程序也需要这样一个大堂经理,即设计模式:Guarded Suspension,“保护性地暂停”。

一个对象GuardedObject,内部有一个成员变量,受保护的对象,以及两个成员方法

  • get(Predicate p)
  • onChanged(T obj)

GuardedObject就是大堂经理
受保护对象就是餐厅的包间
受保护对象的get()是我们的就餐
就餐的前提条件是包间已经收拾好了,即参数p
受保护对象的onChanged()是服务员把包间收拾好了,通过onChanged()方法可以fire一个事件,而这个事件往往能改变前提条件p的计算结果。

左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。

Guarded Suspension模式结构图

GuardedObject的内部实现是管程的一个经典用法:

class GuardedObject<T>{
   
  //受保护的对象
  T obj;
  final Lock lock = 
    new ReentrantLock();
  final Condition done =
    lock.newCondition();
  final int timeout=1;
  //获取受保护对象  
  T get(Predicate<T> p) {
   
    lock.lock();
    try {
   
      //MESA管程推荐写法
      while(!p.test(obj)){
   
      	// 通过条件变量的await()方法实现等待
        done.await(timeout, 
          TimeUnit.SECONDS);
      }
    }catch(InterruptedException e){
   
      throw new RuntimeException(e);
    }finally{
   
      lock.unlock();
    }
    //返回非空的受保护对象
    return obj;
  }
  //事件通知方法
  // 通过条件变量的signalAll()方法实现唤醒功能
  void onChanged(T obj) {
   
    lock.lock();
    try {
   
      this.obj = obj;
      done.signalAll();
    } finally {
   
      lock.unlock();
    }
  }
}

扩展Guarded Suspension模式

Guarded Suspension“大堂经理”能否解决小灰同学遇到的问题。

在处理Web请求的方法handleWebReq()中,可以调用GuardedObject#get()实现等待;在MQ消息的消费方法onMessage()中,可调用GuardedObject#onChanged()实现唤醒。

// 处理浏览器请求
Respond handleWebReq(){
   
  // 创建一消息
  Message msg1 = new Message("1","{...}");
  // 发送消息
  send(msg1);
  // 利用GuardedObject实现等待
  GuardedObject<Message> go = new GuardObjec<>();
  Message r = go.get(t->t != null);
}

void onMessage(Message msg){
   
  // 如何找到匹配的go?
  GuardedObject<Message> go=???
  go.onChanged(msg);
}

但问题是,handleWebReq()里创建了GuardedObject对象的实例go,并调用get()等待结果,那在onMessage()方法中,如何才能够找到匹配的GuardedObject对象呢?
这类似服务员告诉大堂经理某某包间已经收拾好了,大堂经理如何根据包间找到就餐的人。现实世界里,大堂经理的头脑中,有包间和就餐人之间的关系图,所以服务员说完之后大堂经理立刻就能把就餐人找出来。

参考大堂经理识别就餐人的办法,来扩展一下Guarded Suspension模式,从而使它能够很方便地解决小灰同学的问题。在小灰的程序中,每个发送到MQ的消息,都有一个唯一性的属性id,所以我们可以维护一个MQ消息id和GuardedObject对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。

如何实现呢?下面代码是扩展Guarded Suspension,扩展后的

GuardedObject内部维护了一个Map:

  • Key是MQ消息id
  • Value是GuardedObject对象实例

静态方法:

  • create()
    创建一个GuardedObject对象实例,并根据key值将其加入到Map中
  • fireEvent()
    模拟的大堂经理根据包间找就餐人的逻辑。
class GuardedObject<T>{
   
  //受保护的对象
  T obj;
  final Lock lock = 
    new ReentrantLock();
  final Condition done =
    lock.newCondition();
  final int timeout=2;
  //保存所有GuardedObject
  final static Map<Object, GuardedObject> 
  gos=new ConcurrentHashMap<>();
  //静态方法创建GuardedObject
  static <K> GuardedObject 
      create(K key){
   
    GuardedObject go=new GuardedObject();
    gos.put(key, go);
    return go;
  }
  static <K, T> void 
      fireEvent(K key, T obj){
   
    GuardedObject go=gos.remove(key);
    if (go != null){
   
      go.onChanged(obj);
    }
  }
  //获取受保护对象  
  T get(Predicate<T> p) {
   
    lock.lock();
    try {
   
      //MESA管程推荐写法
      while(!p.test(obj)){
   
        done.await(timeout, 
          TimeUnit.SECONDS);
      }
    }catch(InterruptedException e){
   
      throw new RuntimeException(e);
    }finally{
   
      lock.unlock();
    }
    //返回非空的受保护对象
    return obj;
  }
  //事件通知方法
  void onChanged(T obj) {
   
    lock.lock();
    try {
   
      this.obj = obj;
      done.signalAll();
    } finally {
   
      lock.unlock();
    }
  }
}

这样利用扩展后的GuardedObject来解决小灰同学的问题就很简单了,代码如下:

//处理浏览器发来的请求
Respond handleWebReq(){
   
  int id=序号生成器.get();
  //创建一消息
  Message msg1 = new 
    Message(id,"{...}");
  //创建GuardedObject实例
  GuardedObject<Message> go=
    GuardedObject.create(id);  
  //发送消息
  send(msg1);
  //等待MQ消息
  Message r = go.get(
    t->t != null);  
}
void onMessage(Message msg){
   
  //唤醒等待的线程
  GuardedObject.fireEvent(
    msg.id, msg);
}

总结

Guarded Suspension模式本质上是一种等待唤醒机制,只不过Guarded Suspension模式将其规范化了。
规范化好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个Bug来。但Guarded Suspension模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对GuardedObject的功能进行了增强,Dubbo中DefaultFuture这个类也是采用的这种方式。

Guarded Suspension模式也常被称作Guarded Wait模式、Spin Lock模式(因为使用了while循环去等待),这些名字都很形象,不过它还有一个更形象的非官方名字:多线程版本的if。单线程场景中,if语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着if判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if判断条件的结果是可能发生变化的。所以,用“多线程版本的if”来理解这个模式会更简单。

有同学觉得用done.await()还要加锁,太啰嗦,还不如直接使用sleep()方法,下面的写法正确吗?

//获取受保护对象  
T get(Predicate<T> p) {
   
  try {
   
    while(!p.test(obj)){
   
      TimeUnit.SECONDS
        .sleep(timeout);
    }
  }catch(InterruptedException e){
   
    throw new RuntimeException(e);
  }
  //返回非空的受保护对象
  return obj;
}
//事件通知方法
void onChanged(T obj) {
   
  this.obj = obj;
}

转载:https://blog.csdn.net/qq_33589510/article/details/116105711
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场