java一些常用并发工具示例

最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:

一、闭锁(门栓)- CountDownLatch

适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/)
 *
 * @throws InterruptedException
 */
@Test
public void countdownLatch() throws InterruptedException {
    CountDownLatch startLatch = new CountDownLatch(1); //类似发令枪
    CountDownLatch endLatch = new CountDownLatch(10);//这里的数量,要与线程数相同
 
    for (int i = 0; i < 10; i++) {
        Thread t = new Thread(() -> {
            try {
                startLatch.await(); //先等着,直到发令枪响,防止有线程先run
                System.out.println(Thread.currentThread().getName() + " is running...");
                Thread.sleep(10);
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            finally {
                endLatch.countDown(); //每个线程执行完成后,计数
            }
        });
        t.setName("线程-" + i);
        t.start();
    }
    long start = System.currentTimeMillis();
    startLatch.countDown();//发令枪响,所有线程『开跑』
    endLatch.await();//等所有线程都完成
    long end = System.currentTimeMillis();
    System.out.println("done! exec time => " + (end - start) + " ms");
}  

执行结果:

线程-1 is running...
线程-5 is running...
线程-8 is running...
线程-4 is running...
线程-3 is running...
线程-0 is running...
线程-2 is running...
线程-9 is running...
线程-7 is running...
线程-6 is running...
done! exec time => 13 ms

注:大家可以把第14行注释掉,再看看运行结果有什么不同。

二、信号量(Semaphore)

适用场景:用于资源数有限制的并发访问场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class BoundedHashSet<T> {
     private final Set<T> set;
     private final Semaphore semaphore;
 
     public BoundedHashSet(int bound) {
         this.set = Collections.synchronizedSet(new HashSet<T>());
         this.semaphore = new Semaphore(bound);
     }
 
     public boolean add(T t) throws InterruptedException {
         if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
             return false;
         }
         ;
         boolean added = false;
         try {
             added = set.add(t);
             return added;
         finally {
             if (!added) {
                 semaphore.release();
             }
         }
     }
 
     public boolean remove(Object o) {
         boolean removed = set.remove(o);
         if (removed) {
             semaphore.release();
         }
         return removed;
     }
 }
 
 @Test
 public void semaphoreTest() throws InterruptedException {
 
     BoundedHashSet<String> set = new BoundedHashSet<>(5);
     for (int i = 0; i < 6; i++) {
         if (set.add(i + "")) {
             System.out.println(i + " added !");
         else {
             System.out.println(i + " not add to Set!");
         }
     }
 }

上面的示例将一个普通的Set变成了有界容器。执行结果如下:

0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!

三、栅栏CyclicBarrier 

这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Worker extends Thread {
 
    private CyclicBarrier cyclicBarrier;
 
    public Worker(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
 
    private void step1() {
        System.out.println(this.getName() + " step 1 ...");
    }
 
    private void step2() {
        System.out.println(this.getName() + " step 2 ...");
    }
 
    public void run() {
        step1();
        try {
            cyclicBarrier.await();
        catch (InterruptedException e) {
            e.printStackTrace();
        catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        step2();
    }
}
 
@Test
public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
    for (int i = 0; i < 10; i++) {
        Worker w = new Worker(cyclicBarrier);
        w.start();
    }
    cyclicBarrier.await();
 
}

这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2. 执行结果如下:

Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...

四、Exchanger

如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Test
public void exchangerTest() {
    Exchanger<String> exchanger = new Exchanger<>();
 
    Thread t1 = new Thread(() -> {
        String temp = "AAAAAA";
        System.out.println("thread 1 交换前:" + temp);
        try {
            temp = exchanger.exchange(temp);
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread 1 交换后:" + temp);
    });
 
    Thread t2 = new Thread(() -> {
        String temp = "BBBBBB";
        System.out.println("thread 2 交换前:" + temp);
        try {
            temp = exchanger.exchange(temp);
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread 2 交换后:" + temp);
    });
 
    t1.start();
    t2.start();
}

 执行结果:

thread 1 交换前:AAAAAA
thread 2 交换前:BBBBBB
thread 2 交换后:AAAAAA
thread 1 交换后:BBBBBB

五、FutureTask/Future

一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException {
 
    Callable<String> callable = () -> {
        System.out.println("很耗时的操作处理中。。。");
        Thread.sleep(5000);
        return "done";
    };
 
    FutureTask<String> futureTask = new FutureTask<>(callable);
 
    System.out.println("就绪。。。");
    new Thread(futureTask).start();
    System.out.println("主线程其它处理。。。");
    System.out.println(futureTask.get());
    System.out.println("处理完成!");
 
    System.out.println("-----------------");
 
    System.out.println("executor 就绪。。。");
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<String> future = executorService.submit(callable);
    System.out.println(future.get(10, TimeUnit.SECONDS));
}

 执行结果:

就绪。。。
主线程其它处理。。。
很耗时的操作处理中。。。
done
处理完成!
-----------------
executor 就绪。。。
很耗时的操作处理中。。。
done

六、阻塞队列BlockingQueue

阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Test
    public void blockingQueueTest() throws InterruptedException {
        final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5);
 
        Thread producer = new Thread() {
            public void run() {
                Random rnd = new Random();
                while (true) {
                    try {
                        int i = rnd.nextInt(10000);
                        blockingDeque.put(i + "");
                        System.out.println(this.getName() + " 产生了一个数字:" + i);
                        Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        producer.setName("producer 1");
 
 
        Thread consumer = new Thread() {
            public void run() {
                while (true) {
                    Random rnd = new Random();
                    try {
 
                        String i = blockingDeque.take();
                        System.out.println(this.getName() + " 消费了一个数字:" + i);
                        Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        consumer.setName("consumer 1");
 
        producer.start();
        consumer.start();
 
        while (true) {
            Thread.sleep(100);
        }
    }

执行结果:

producer 1 产生了一个数字:6773
consumer 1 消费了一个数字:6773
producer 1 产生了一个数字:4456
producer 1 产生了一个数字:8572
producer 1 产生了一个数字:5764
producer 1 产生了一个数字:2874
producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
consumer 1 消费了一个数字:4456
producer 1 产生了一个数字:4193

原文地址:https://www.cnblogs.com/wojiaochuichui/p/8533898.html