第二部分:并发工具类15->Lock和condition(下)

Dubbo用管程实现异步转同步

Lock可以响应中断,支持超时,以及非阻塞获取锁

Condition实现了管程里面的条件变量

synchronized里的条件变量只有1个,而Lock和Condition的组合可以有多个条件变量

1.利用两个条件变量实现阻塞队列

阻塞队列,2个条件变量,1个是队列不空(空队列不允许出队),1个是队列不满(队列满不允许入队)


public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

线程等待和通知是要调用await(),singal(),singAll()
语意与wait(),notify(),notifyAll()相同,但是wait(),notify(),notifyAll()只有在synchronized里才能使用。lock和condition里不能用。

2.同步与异步

调用方是否需要等待结果,需要等待就是同步
如果调用非不需要等待结果,就是异步


// 计算圆周率小说点后100万位 
String pai1M() {
  //省略代码无数
}

pai1M()
printf("hello world")

如果pai1M方法要执行2个礼拜,线程一直等着计算结果,2个结果后返回,就可以执行hello world了。这就是同步
如果执行pai1M之后,线程不用等待结果,就立刻执行hello world了,就属于异步了

同步是java默认的处理方式
如果要异步,怎么做?

  1. 调用方创建一个子线程,在子线程中执行方法调用,异步调用
  2. 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法我们成为异步方法。

3.Dubbo源码分析

TCP协议本身就是异步的,RPC调用时,TCP层面,发送完RPC请求后,线程不会等待RPC的响应结果的

可是平时工作中RPC调用大多数是同步的啊?怎么回事

很简单,就是做了异步转同步。RPC框架Dubbo就是做了异步转同步的事情。

sayHello方法就是同步方法,执行sayHello时候,线程会停下来等结果


DemoService service = 初始化部分省略
String message = 
  service.sayHello("dubbo");
System.out.println(message);

此时打印出线程栈信息,会看到线程的状态为TIMED_WAITING
本来发送请求是异步的,但是调用线程却阻塞了,dubbo做了异步转同步的事情
DefaultFuturn.get(),推断Dubbo异步转同步的功能通过DefaultFuture这个类实现的

在DubblInvoker的108行调用的DefaultFuture.get(),但是也是先调用了request(inv,timeout)方法,这个方法其实就是发送rpc请求,然后之后调用get方法返回rpc结果


public class DubboInvoker{
  Result doInvoke(Invocation inv){
    // 下面这行就是源码中108行
    // 为了便于展示,做了修改
    return currentClient 
      .request(inv, timeout)
      .get();
  }
}

当RPC返回结果前,阻塞调用线程,让调用线程等待,当rpc返回结果后,唤醒调用线程,让调用线程重新执行
这也就是等待-通知机制啊。
接下来看看dubbo如何用等待通知机制,实现异步转同步操作


// 创建锁与条件变量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
  while (!isDone()) {
    done.await(timeout);
      long cur=System.nanoTime();
    if (isDone() || 
          cur-start > timeout){
      break;
    }
  }
  } finally {
  lock.unlock();
  }
  if (!isDone()) {
  throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
  return response != null;
}
// RPC结果返回时调用该方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signal();
    }
  } finally {
    lock.unlock();
  }
}

调用get()方法等待rpc结果时,看到的都是熟悉面孔,lock获取锁,finally释放锁,获取锁后通过循环调用await来实现等待
rpc结果返回时,调用doReceived方法,这个方法lock获取锁,finally释放锁,通过singal通知调用线程,然后await那收到,就不会继续等待了

dubbo的异步转同步就分析完了,
最近异步编程大火,有好多api都是异步的,例如公有云api本身是异步的,创建云主机的异步api,调用成功了,需要掉另一个api轮训主机状态。
项目内部封装创建云主机的api接口,面临异步转同步的问题。同步API更易用。

原创:做时间的朋友
原文地址:https://www.cnblogs.com/PythonOrg/p/14963284.html