并发编程--Concurrent-工具类介绍

并发编程--Concurrent-工具类介绍

并发编程--Concurrent-工具类介绍

  • CountDownLatch
  • CylicBarrier
  • Semaphore
  • Condition 对象监视器下个篇幅引入
  • Callable 不单独说明 Demo中会使用
  • Future

Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包
这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类。在这个包被添加以前,你需要自己去动手实现自己的相关工具类;

  • 本文我将带你一一认识 java.util.concurrent 包里的这些类,然后你可以尝试着如何在项目中使用它们。本文中我将使用 Java 6 版本,我不确定这和 Java 5 版本里的是否有一些差异
  • 我不会去解释关于 Java 并发的核心问题 其背后的原理,也就是说,如果你对那些东西感兴趣,参考《Java 并发指南》;

1. CountDownLatch(计时器) 

简介

官方介绍

  • > A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.同步协助,允许一个或多个线程等待,直到其他一组线程执行的操作完成,再继续执行。

理解:

  • CountDownLatch是一个同步的辅助类,它可以允许一个或多个线程等待,直到一组在其它线程中的操作执行完成。
  • 一个CountDownLatch会通过一个给定的count数来被初始化。其中await()方法会一直阻塞,来作为一个开/关的门或门闩:直到当前的count被减到0,而这个过程是通过调用countDown()方法来实现的。在await()方法不再阻塞以后,所有等待的线程都会被释放,并且任何await()的子调用都会立刻返回。这是一次性的--count不能被重置。如果你需要一种能重置count的版本,请考虑使用CyclicBarrier。
  • CountDownLatch一个有用的属性就是它不需要线程们在继续执行之前,调用countDown来等待count被减到0。它简单地阻止了任何调用了await()的线程继续,直到所有的线程都能够通过。

作用

个人理解:CountDownLatch:我把他理解成倒计时锁。

场景:常用于监听某些初始化操作,等待初始化操作完成后,通知主线程继续工作。

示例:MyCountDownLatch.Java

 1 public class MyCountDownLatch {
 2 
 3     public static void main(String[] args) {
 4 
 5       final CountDownLatch countDown = new CountDownLatch(2);
 6 
 7       Thread t1 = new Thread(new Runnable() {
 8         @Override
 9         public void run() {
10           try {
11             System.out.println("进入线程t1" + "等待其他线程处理完成...");
12             countDown.await();
13             System.out.println("t1线程继续执行...");
14           } catch (InterruptedException e) {
15             e.printStackTrace();
16           }
17         }
18       },"t1");
19 
20       Thread t2 = new Thread(new Runnable() {
21         @Override
22         public void run() {
23           try {
24             System.out.println("t2线程进行初始化操作...");
25             Thread.sleep(3000);
26             System.out.println("t2线程初始化完毕,通知t1线程继续...");
27             countDown.countDown();
28           } catch (InterruptedException e) {
29             e.printStackTrace();
30           }
31         }
32       });
33       Thread t3 = new Thread(new Runnable() {
34         @Override
35         public void run() {
36           try {
37             System.out.println("t3线程进行初始化操作...");
38             Thread.sleep(4000);
39             System.out.println("t3线程初始化完毕,通知t1线程继续...");
40             countDown.countDown();
41           } catch (InterruptedException e) {
42             e.printStackTrace();
43           }
44         }
45       });
46 
47       t1.start();
48       t2.start();
49       t3.start();
50     }
51 }

执行结果

进入线程t1等待其他线程处理完成...
t2线程进行初始化操作...
t3线程进行初始化操作...
t2线程初始化完毕,通知t1线程继续...
t3线程初始化完毕,通知t1线程继续...
t1线程继续执行...

2. CylicBarrier(栅栏)

CylicBarrier 的作用

  • CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
  • CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

个人理解:CyclicBarrier:可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。

示例:

 1 public class MyCyclicBarrier {
 2 
 3    static class Runner implements Runnable {  
 4        private CyclicBarrier barrier;  
 5        private String name;  
 6 
 7        public Runner(CyclicBarrier barrier, String name) {  
 8            this.barrier = barrier;  
 9            this.name = name;  
10        }  
11        @Override  
12        public void run() {  
13            try {  
14                Thread.sleep(1000 * (new Random()).nextInt(5));  
15                System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" " + name + " 准备OK.");
16                barrier.await();  
17            } catch (InterruptedException e) {  
18                e.printStackTrace();  
19            } catch (BrokenBarrierException e) {  
20                e.printStackTrace();  
21            }  
22            System.out.println(name + " Go!!");  
23        }  
24    }
25 
26    public static void main(String[] args) throws IOException, InterruptedException {  
27        CyclicBarrier barrier = new CyclicBarrier(3);  // 3
28        ExecutorService executor = Executors.newFixedThreadPool(3);  
29 
30        executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
31        executor.submit(new Thread(new Runner(barrier, "lisi")));  
32        executor.submit(new Thread(new Runner(barrier, "wangwu")));  
33 
34        executor.shutdown();  
35    }  
36 
37 }  

输出结果:

2018-02-06 02:30:06 lisi 准备OK.
2018-02-06 02:30:06 zhangsan 准备OK.
2018-02-06 02:30:07 wangwu 准备OK.
2018-02-06 02:30:07 wangwu Go!!
2018-02-06 02:30:07 lisi Go!!
2018-02-06 02:30:07 zhangsan Go!!

CyclicBarrier的应用场景 

  • CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。
  • 比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

3. Semaphore(信号量)

Semaphore的作用

  • 在java中,使用了synchronized关键字和Lock锁实现了资源的并发访问控制,在同一时间只允许唯一了线程进入临界区访问资源(读锁除外),这样子控制的主要目的是为了解决多个线程并发同一资源造成的数据不一致的问题。
  • Semaphore主要是在多线程中可以轻松控制信号量,针对某个资源可被并发访问的个数。acquire()方法可以或得一个访问的许可,release()方法释放一个许可。提供同步机制,控制同时访问的个数。

示例:

 1 public class MySemaphore {  
 2  public static void main(String[] args) {  
 3    // 线程池  
 4    ExecutorService exec = Executors.newCachedThreadPool();  
 5    // 只能2个线程同时访问  
 6    final Semaphore semp = new Semaphore(2);  
 7    // 模拟10个客户端访问  
 8    for (int index = 0; index < 10; index++) {  
 9      final int NO = index;  
10      Runnable run = new Runnable() {  
11        public void run() {  
12          try {
13            // 获取许可  
14            semp.acquire();  
15            System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" " + "Accessing: " + NO);
16            //模拟实际业务逻辑
17            Thread.sleep((long) (Math.random() * 10000));  
18            // 访问完后,释放  
19            semp.release();  
20          } catch (InterruptedException e) {  
21          }  
22        }  
23      };  
24      exec.execute(run);  
25    }
26 
27    try {
28      Thread.sleep(10);
29    } catch (InterruptedException e) {
30      e.printStackTrace();
31    }
32 
33    //System.out.println(semp.getQueueLength());
34    // 退出线程池
35    exec.shutdown();  
36  }
37 }  

执行结果:

2018-11-06 11:30:51 pool-1-thread-1接受服务: 0
2018-11-06 11:30:51 pool-1-thread-2接受服务: 1
2018-11-06 11:30:53 pool-1-thread-6接受服务: 5
2018-11-06 11:30:56 pool-1-thread-3接受服务: 2
2018-11-06 11:30:57 pool-1-thread-5接受服务: 4
2018-11-06 11:30:58 pool-1-thread-10接受服务: 9
2018-11-06 11:30:02 pool-1-thread-7接受服务: 6
2018-11-06 11:30:04 pool-1-thread-8接受服务: 7
2018-11-06 11:30:08 pool-1-thread-9接受服务: 8
2018-11-06 11:30:10 pool-1-thread-4接受服务: 3

4. Future简介 

  • 在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。
  • Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。
  • FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提空 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。
  • FutureTask 可以用来包装一个 Callable 或是一个runnable对象。因为FurtureTask实现了Runnable方法,所以 FutureTask可以提交(submit)给Excutor执行(excution).。

 FutureTask使用场景

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等.

 1 public class MyFuture implements Callable<String>{
 2     private String para;
 3 
 4     public MyFuture(String para){
 5       this.para = para;
 6     }
 7 
 8     /**
 9      * 这里是真实的业务逻辑,其执行可能很慢
10      */
11     @Override
12     public String call() throws Exception {
13       //模拟执行耗时
14       Thread.sleep(5000); //模拟耗时业务
15       String result = this.para + "处理完成";
16       return result;
17     }
18 
19     //主控制函数
20     public static void main(String[] args) throws Exception {
21       String queryStr = "query";
22       //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
23       FutureTask<String> future = new FutureTask<String>(new MyFuture(queryStr));
24 
25       FutureTask<String> future2 = new FutureTask<String>(new MyFuture(queryStr));
26       //创建一个固定线程的线程池且线程数为1,
27       ExecutorService executor = Executors.newFixedThreadPool(2);
28       //这里提交任务future,则开启线程执行RealData的call()方法执行
29       //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
30 
31       Future f1 = executor.submit(future);        //单独启动一个线程去执行的
32       Future f2 = executor.submit(future2);
33       System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" " + "请求完毕");
34 
35       try {
36         //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
37         System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" " + "处理实际的业务逻辑...");
38         Thread.sleep(1000);
39       } catch (Exception e) {
40         e.printStackTrace();
41       }
42       // 可以取消异步任务
43       //future2.cancel(true);
44 
45       try {
46         // 阻塞,等待异步任务执行完毕-获取异步任务的返回值
47         //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
48         System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" future1 " + "数据:" + future.get());
49         System.out.println(new SimpleDateFormat("YYYY-HH-MM hh:dd:ss").format(new Date()) +" future2" + "数据:" + future2.get());
50 
51       } catch (InterruptedException e) {
52         e.printStackTrace();
53       } catch (ExecutionException e) {
54         e.printStackTrace();
55       }
56       executor.shutdown();
57     }
58 }

运行结构:

2018-11-06 11:30:38 请求完毕
2018-11-06 11:30:38 处理实际的业务逻辑...
2018-11-06 11:30:39 future1 数据:query处理完成
2018-11-06 11:30:43 future2数据:query处理完成

  

原文地址:https://www.cnblogs.com/Mao-admin/p/9989336.html