线程工具类

                                                  同步并发工具类

同步屏障CyclicBarrier

简介:CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.CyclicBarrier;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 /**
 8  * 表示大家彼此等待,集合后才开始出发(张家界旅游,早上等车,大峡谷进去集合(范乾),大峡谷出来集合)
 9  * 工作中没准就有这种应用,当以后的工作中做什么项目,可能用到这个技术,就要联想起来
10  * 理论上是一定有这种需求,jdk开发者才会搞这种技术,只是目前还没碰到。
11  */
12 public class CyclicBarrierTest {
13 
14     public static void main(String[] args) {
15         ExecutorService executor = Executors.newCachedThreadPool();
16         final CyclicBarrier cb = new CyclicBarrier(15);
17         for (int i = 0; i <15; i++) {
18             Runnable runnable = new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         // 获取一个信号灯
23                         Thread.sleep((long) (Math.random() * 10000));
24                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址1,当前已有"
25                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
26                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
27                         cb.await(); // 在这里集合
28                         
29                         Thread.sleep((long) (Math.random() * 10000));
30                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址2,当前已有"
31                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
32                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
33                         cb.await(); // 在这里集合
34                         
35                         Thread.sleep((long) (Math.random() * 10000));
36                         System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址3,当前已有"
37                                 + (cb.getNumberWaiting() + 1) + "个已经到达。"
38                                 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候"));
39                         cb.await(); // 在这里集合
40                     } catch (Exception e) {
41                         e.printStackTrace();
42                     }
43                 }
44             };
45             executor.execute(runnable);
46         }
47         executor.shutdown();
48     }
49 }

如果把new CyclicBarrier(15)修改成new CyclicBarrier(16)则主线程和子线程会永远等待,因为没有第16个线程执行await方法,即没有第16个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

 1 public class CyclicBarrierTest2 {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(2, new A());
 4 
 5     public static void main(String[] args) {
 6         new Thread(new Runnable() {
 7 
 8             @Override
 9             public void run() {
10                 try {
11                     c.await();
12                 } catch (Exception e) {
13 
14                 }
15                 System.out.println(1);
16             }
17         }).start();
18 
19         try {
20             c.await();
21         } catch (Exception e) {
22 
23         }
24         System.out.println(2);
25     }
26 
27     static class A implements Runnable {
28 
29         @Override
30         public void run() {
31             System.out.println(3);
32         }
33 
34     }
35 
36 }

输出:

1 3
2 1
3 2

CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

同步信号量Semaphore

控制并发线程数

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。
在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。
更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。
在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。
 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Semaphore;
 6 
 7 /**
 8  * 信号灯同步工具
 9  * 与互斥锁对比,信号灯可以由其它线程释放,互斥锁只能由自己释放
10  * 模拟场景:现在有3个厕所坑,有5个人要进去,先进去3个人,其它2个等待,有人出来,等待的人再进去
11  * <p>
12  * TTP:
13  * 知识很容易就学会,最大的困难是在以后的工作中遇到困难了,正好能想到用这个工具
14  * 每个药能治什么病,很容易学会,难就难在当看到一个病人以后,能想起来就这个药,能想到这一步你就是华佗。
15  */
16 public class SemaphoreTest {
17     public static void main(String[] args) {
18         ExecutorService executor = Executors.newFixedThreadPool(5);
19 
20         //可使用的公共资源数量
21         final Semaphore sp = new Semaphore(3);
22 
23         for (int i = 0; i < 5; i++) {
24             Runnable runnable = new Runnable() {
25                 @Override
26                 public void run() {
27                     try {
28                         // 获取一个信号灯
29                         sp.acquire();
30                     } catch (InterruptedException e) {
31                         e.printStackTrace();
32                     }
33                     System.out.println("线程: " + Thread.currentThread().getName() + "进入,当前已有"
34                             + (3 - sp.availablePermits()) + "个并发。");
35                     try {
36                         Thread.sleep((long) (Math.random() * 10000));
37                     } catch (InterruptedException e) {
38                         e.printStackTrace();
39                     }
40                     System.out.println("线程" + Thread.currentThread().getName() + "即将离开。");
41                     sp.release();
42                     System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有"
43                             + (3 - sp.availablePermits()) + "个并发");
44                 }
45             };
46             executor.execute(runnable);
47         }
48         executor.shutdown();
49     }
50 }

CountDownLatch

等待多线程完成,CountDownLatch 允许一个或多个线程等待其他线程完成操作。

应用场景

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join。代码如下:

 1 public class JoinCountDownLatchTest {
 2 
 3     public static void main(String[] args) throws InterruptedException {
 4         Thread parser1 = new Thread(new Runnable() {
 5             @Override
 6             public void run() {
 7             }
 8         });
 9 
10         Thread parser2 = new Thread(new Runnable() {
11             @Override
12             public void run() {
13                 System.out.println("parser2 finish");
14             }
15         });
16 
17         parser1.start();
18         parser2.start();
19         parser1.join();
20         parser2.join();
21         System.out.println("all parser finish");
22     }
23 
24 }
 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 /**
 8  * 倒计时计数器
 9  * 模拟场景:
10  *  三个运动员跑步比赛,听到一听哨响,开始跑,全部跑到了,一起公布成绩。
11  */
12 public class CoutDownLatchTest {
13     public static void main(String[] args) {
14         ExecutorService executor = Executors.newCachedThreadPool();
15         final CountDownLatch cdOrder = new CountDownLatch(1);
16         final CountDownLatch cdAnswer = new CountDownLatch(3);
17 
18         for (int i = 0; i < 3; i++) {
19             Runnable runnable = new Runnable() {
20                 @Override
21                 public void run() {
22                     try {
23                         System.out.println("线程"+ Thread.currentThread().getName() + "正在准备接受命令");
24                         cdOrder.await();
25                         System.out.println("线程" + Thread.currentThread().getName() + "已经接受命令");
26 
27                         Thread.sleep((long) (Math.random() * 10000));
28                         System.out.println("线程"+ Thread.currentThread().getName() + "回应此命令的处理结果");
29                         cdAnswer.countDown();
30 
31                     } catch (Exception e) {
32                         e.printStackTrace();
33                     }
34                 }
35             };
36             executor.execute(runnable);
37         }
38 
39         try {
40             Thread.sleep((long) (Math.random() * 10000));
41             System.out.println("线程"+ Thread.currentThread().getName() + "即将发布命令");
42             cdOrder.countDown();
43             System.out.println("线程"+ Thread.currentThread().getName() + "已经发送命令,正在等待结果");
44             cdAnswer.await();
45             System.out.println("线程"+ Thread.currentThread().getName() + "已收到所有响应结果");
46         } catch (Exception e) {
47             e.printStackTrace();
48         }
49         executor.shutdown();
50     }
51 }
52 
53 
54   Map<String,Integer> linkMap = new LinkedHashMap<>();
55              for(Entry<String,Integer> newEntry :result){
56                  linkMap.put(newEntry.getKey(), newEntry.getValue());            
57              }
58              for(Map.Entry<String, Integer> mapEntry : linkMap.entrySet()){
59                  System.out.println("key:"+mapEntry.getKey()+"  value:"+mapEntry.getValue());
60              }
61          } 

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

isBroken的使用代码如下:

 1 import java.util.concurrent.BrokenBarrierException;
 2 import java.util.concurrent.CyclicBarrier;
 3 
 4 public class CyclicBarrierTest3 {
 5 
 6     static CyclicBarrier c = new CyclicBarrier(2);
 7 
 8     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 9         Thread thread = new Thread(new Runnable() {
10 
11             @Override
12             public void run() {
13                 try {
14                     c.await();
15                 } catch (Exception e) {
16                 }
17             }
18         });
19         thread.start();
20         thread.interrupt();
21         try {
22             c.await();
23         } catch (Exception e) {
24             System.out.println(c.isBroken());
25         }
26     }
27 }
原文地址:https://www.cnblogs.com/tangjiang-code/p/9867512.html