同步工具类

介绍几个同步工具类,很简单、常用。

(说些废话啊,最近学习的过程中,用google搜索信息,查class的时候,看Java api 8 的英文文档,收获很多)

1.CyclicBarrier

它允许一组线程在到达一个共同栅栏之前,相互等待,全都到达之后,再一起前进。与Countdown的去别是,它可以设置多个共同的栅栏,而CountDown只能设置一次。

我们看看api中的构造方法:

CyclicBarrier(int parties): parties表示这组线程的数量

CyclicBarrier(int parties, Runnable barrierAction):barrierAction表示的是,在这组线程全都达到共同栅栏后,所触发的任务。这个任务是由最后到达共同栅栏的线程来执行。

下面看代码的例子:

 1 import java.util.ArrayList;
 2 import java.util.Collections;
 3 import java.util.List;
 4 import java.util.Random;
 5 import java.util.concurrent.BrokenBarrierException;
 6 import java.util.concurrent.CyclicBarrier;
 7 
// 这个demo,是多个线程随机产生数字,放在list中,等到所有线程都产生完数字后(共同栅栏),再对所有数字进行求和
// 测试结果,自己运行一下就好!
8 public class CyclicBarrierDemo { 9 10 private CyclicBarrier cyclicBarrier; 11 private List<List<Integer>> partialResults 12 = Collections.synchronizedList(new ArrayList<>()); 13 private Random random = new Random(); 14 private int NUM_PARTIAL_RESULTS; 15 private int NUM_WORKERS; 16 17 class NumberCruncherThread implements Runnable { 18 19 @Override 20 public void run() { 21 String thisThreadName = Thread.currentThread().getName(); 22 List<Integer> partialResult = new ArrayList<>(); 23 24 // Crunch some numbers and store the partial result 25 for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { 26 Integer num = random.nextInt(10); 27 System.out.println(thisThreadName 28 + ": Crunching some numbers! Final result - " + num); 29 partialResult.add(num); 30 } 31 32 partialResults.add(partialResult); 33 try { 34 System.out.println(thisThreadName 35 + " waiting for others to reach barrier."); 36 cyclicBarrier.await(); 37
// 红色部分添加的目的就是,纯粹是为了展示cyclicbarrier可以多次重复设置共同栅栏 38 System.out.println("No.2 thread name " + thisThreadName); 39 cyclicBarrier.await(); 40 41 System.out.println("No.3 thread name " + thisThreadName); 42 cyclicBarrier.await(); 43 } catch (InterruptedException e) { 44 // ... 45 } catch (BrokenBarrierException e) { 46 // 当await超时,所示await阻塞的线程被中断时,那么所有阻塞的线程将被终止,并报出BrokenBarrierException 47 } 48 } 49 } 50 51 class AggregatorThread implements Runnable { 52 53 @Override 54 public void run() { 55 56 String thisThreadName = Thread.currentThread().getName(); 57 58 System.out.println( 59 thisThreadName + ": Computing sum of " + NUM_WORKERS 60 + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); 61 int sum = 0; 62 63 for (List<Integer> threadResult : partialResults) { 64 System.out.print("Adding "); 65 for (Integer partialResult : threadResult) { 66 System.out.print(partialResult+" "); 67 sum += partialResult; 68 } 69 System.out.println(); 70 } 71 System.out.println(thisThreadName + ": Final result = " + sum); 72 } 73 } 74 75 public void runSimulation(int numWorkers, int numberOfPartialResults) { 76 NUM_PARTIAL_RESULTS = numberOfPartialResults; 77 NUM_WORKERS = numWorkers; 78 79 cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); 80 81 System.out.println("Spawning " + NUM_WORKERS 82 + " worker threads to compute " 83 + NUM_PARTIAL_RESULTS + " partial results each"); 84 85 for (int i = 0; i < NUM_WORKERS; i++) { 86 Thread worker = new Thread(new NumberCruncherThread()); 87 worker.setName("Thread " + i); 88 worker.start(); 89 } 90 } 91 92 public static void main(String[] args) { 93 CyclicBarrierDemo demo = new CyclicBarrierDemo(); 94 demo.runSimulation(5, 3); 95 } 96 }

2. CountdownLatch

 countdownLatch有个计时器,每次会按照我们的要求进行递减,我们可以阻塞某个线程直到它递减到0为止。

 1 public class Worker implements Runnable {
 2     private List<String> outputScraper;
 3     private CountDownLatch countDownLatch;
 4  
 5     public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
 6         this.outputScraper = outputScraper;
 7         this.countDownLatch = countDownLatch;
 8     }
 9  
10     @Override
11     public void run() {
12         doSomeWork();
13         outputScraper.add("Counted down");
14         countDownLatch.countDown();
15     }
16 }

我认为,countdownLatch阻塞的是主线程,等countdown为0时,再继续执行主线程的操作。

 1 @Test
 2 public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
 3   throws InterruptedException {
 4  
 5     List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
 6     CountDownLatch countDownLatch = new CountDownLatch(5);
// Stream. 这是java8中添加的新特性
7 List<Thread> workers = Stream 8 .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) 9 .limit(5) 10 .collect(toList()); 11 12 workers.forEach(Thread::start);
// await阻塞线程,直到countdown为0
13 countDownLatch.await(); 14 outputScraper.add("Latch released"); 15 16 assertThat(outputScraper) 17 .containsExactly( 18 "Counted down", 19 "Counted down", 20 "Counted down", 21 "Counted down", 22 "Counted down", 23 "Latch released" 24 ); 25 }

3.Semaphore

它用来控制同时访问某个特定资源的操作数量或是同时执行某个指定操作的数量。Semaphore里管理着一组虚拟的许可,acquire()会阻塞到获得许可为止,每个release()会增加一个许可。许可对象并不是真实存在的,并且Semeaphore也不会将许可与线程关联起来,因此,在一个线程中得到的许可,可以在另外线程中释放。

我们看一下它的构造函数:

Semaphore(int permits)                permits:许可的数量
Semaphore(int permits, boolean fair)        fair:true表示在竞争中FIFO的许可授权

接下来看个例子,加深理解(通过Semaphore将容器变成有界阻塞容器):

 1 public class BoundedhashSet<T> {
 2     private final Set<T> set;
 3     private final Semaphore semaphore;
 4     
 5     public BoundedhashSet(int bound) {
 6         this.set = Collections.synchronizedSet(new HashSet<T>());
 7         semaphore = new Semaphore(bound);
 8     }
 9     
10     public boolean add(T o) throws InterruptedException{
11         semaphore.acquire();
12         boolean wasAdded = false;
13         try {
14             set.add(o);
15             return wasAdded;
16         } finally {
17             if (!wasAdded)
18                 semaphore.release();
19         }
20     }
21     
22     public boolean remove(Object o) {
23         boolean wasRemoved = set.remove(o);
24         if (wasRemoved)
25             semaphore.release();
26         return wasRemoved;
27     }
28 }

4.Exchanger

它是一种Two-party栅栏,双方在栅栏位置交换数据。当双方执行不对称的操作时,这个类很有用。例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。Exchanger可以看做是双向的SynchronousQueue。Exchangers在管道设计方面的系统中会有很大用处。

下面看个例子,理解一下其用法(这个Exchanger类或许在平常工作中并不常见):

 1 class FillAndEmpty {
 2    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
 3    DataBuffer initialEmptyBuffer = ... a made-up type
 4    DataBuffer initialFullBuffer = ...
 5 
 6    class FillingLoop implements Runnable {
 7      public void run() {
 8        DataBuffer currentBuffer = initialEmptyBuffer;
 9        try {
10          while (currentBuffer != null) {
11            addToBuffer(currentBuffer);
12            if (currentBuffer.isFull())
13              currentBuffer = exchanger.exchange(currentBuffer);
14          }
15        } catch (InterruptedException ex) { ... handle ... }
16      }
17    }
18 
19    class EmptyingLoop implements Runnable {
20      public void run() {
21        DataBuffer currentBuffer = initialFullBuffer;
22        try {
23          while (currentBuffer != null) {
24            takeFromBuffer(currentBuffer);
25            if (currentBuffer.isEmpty())
26              currentBuffer = exchanger.exchange(currentBuffer);
27          }
28        } catch (InterruptedException ex) { ... handle ...}
29      }
30    }
31 
32    void start() {
33      new Thread(new FillingLoop()).start();
34      new Thread(new EmptyingLoop()).start();
35    }
36  }

后续陆续补充。。。

参考文献:

java api 8

《Java 并发编程实战》

原文地址:https://www.cnblogs.com/lihao007/p/10311783.html