多线程编程核心技术(十五)CountDownLatch和CyclicBarrier

题目:有个对账系统需要进行优化,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。

首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。

 核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。

while(存在未对账订单){
  // 查询未对账订单
  pos = getPOrders();
  // 查询派送单
  dos = getDOrders();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
} 

假设目前是串行的,那应该如何进行优化呢?思路如下图所示,利用多线程来提升吞吐量。

 在业务逻辑上getPO和getDO是没有任何关联度的,时间上的前后也不会影响后续的代码执行。伪代码也很简单

while(存在未对账订单){
  // 查询未对账订单
  Thread T1 = new Thread(()->{
    pos = getPOrders();
  });
  T1.start();
  // 查询派送单
  Thread T2 = new Thread(()->{
    dos = getDOrders();
  });
  T2.start();
  // 等待T1、T2结束
  T1.join();
  T2.join();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
} 

  这个优化方案的不足就是每次都是新建一个新线程,这个就是一种浪费,但也基本上满足了需要,但是如果进一步优化就是加入线程池技术。

// 创建2个线程的线程池
Executor executor = 
  Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
  });
  
  /* ??如何实现等待??*/
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}   

  拉入线程池之后,就出现了无法等待,那肯定要解决嘛。而且肯定有方法解决,不然大厂为什么提倡使用线程池?

最简单的方法就是加入一个计数器,两个线程执行完之后进行累加,差异写入差异库之后再重置。

但是Java 并发包里已经提供了实现类似功能的工具类:CountDownLatch。

首先创建了一个 CountDownLatch,计数器的初始值等于 2,之后在pos = getPOrders();和dos = getDOrders();两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用 latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。

// 创建2个线程的线程池
Executor executor = 
  Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为2
  CountDownLatch latch = 
    new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}
latch.await();实现了线程之间的等待。
再进一步优化的话,就要按照下图所示的进行了

 这样的话之前的代码需要修改的地方其实多,就是在执行check和save的时候再进行上面的线程。两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果。

很明显的逻辑关系就是save依赖check,check依赖查询的情况。

先看修改后的代码逻辑

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = 
  Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

  

 整理出的思路差不多类似这个。

CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

smartcat.994
原文地址:https://www.cnblogs.com/SmartCat994/p/14212111.html