java.util.concurrent

一、java.util.concurrent介绍

java.util.concurrent包含了许多线程安全,测试良好,高性能的并发模块。创建java.util.concurrent的目的就是要实现Collection框架对数据结构所执行的并发操作。

二、核心组件

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

2.1 Executor

一个接口,其定义了一个接收Runnable对象的方法executor。

1 public class Invoker implements Executor {
2     @Override
3     public void execute(Runnable r) {
4         r.run();
5     }
6 }
1 public void execute() {
2     Executor executor = new Invoker();
3     executor.execute( () -> {
4         // task to be performed
5     });
6 }

2.2 ExecutorService

是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。

在使用ExecutorService之前,我们需要定义个Runnable类

1 public class Task implements Runnable {
2     @Override
3     public void run() {
4         // task details
5     }
6 }

然后需要创建ExecutorService实例,需要制定线程池的大小

1 ExecutorService executor = Executors.newFixedThreadPool(10);

一旦executor创建完成,我们可以使用executor提交任务

1 public void execute() { 
2     executor.submit(new Task()); 
3 }

它还提供了两种开箱即用的执行终止方法。第一个是shutdown();它等待所有提交的任务完成执行。另一个方法是shutdownnow(),它立即终止所有挂起/正在执行的任务。

还有另一种方法等待终止(long timeout,timeunit unit),它强制阻塞,直到在触发关闭事件或发生执行超时后,或执行线程本身中断后,所有任务都已完成执行为止。

1 try {
2     executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
3 } catch (InterruptedException e) {
4     e.printStackTrace();
5 }

2.3 ScheduledExecutorService

ScheduledExecutorService和ExecutorService十分相似,但是它能周期性的执行任务。

 1 public void execute() {
 2     ScheduledExecutorService executorService
 3       = Executors.newSingleThreadScheduledExecutor();
 4  
 5     Future<String> future = executorService.schedule(() -> {
 6         // ...
 7         return "Hello world";
 8     }, 1, TimeUnit.SECONDS);
 9  
10     ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
11         // ...
12     }, 1, TimeUnit.SECONDS);
13  
14     executorService.shutdown();
15 }

ScheduledExecutorService还可以在固定的延迟后执行任务

1 executorService.scheduleAtFixedRate(() -> {
2     // ...
3 }, 1, 10, TimeUnit.SECONDS);
4  
5 executorService.scheduleWithFixedDelay(() -> {
6     // ...
7 }, 1, 10, TimeUnit.SECONDS);

2.4 Future

Future用于表示异步操作的结果。

1 public void invoke() {
2     ExecutorService executorService = Executors.newFixedThreadPool(10);
3  
4     Future<String> future = executorService.submit(() -> {
5         // ...
6         Thread.sleep(10000l);
7         return "Hello world";
8     });
9 }

我们可以使用以下代码段检查未来的结果是否准备好,并在计算完成后获取数据。

1 if (future.isDone() && !future.isCancelled()) {
2     try {
3         str = future.get();
4     } catch (InterruptedException | ExecutionException e) {
5         e.printStackTrace();
6     }
7 }

我们还可以为给定的操作指定超时时间。如果任务花费的时间超过此时间,将引发TimeoutException。

1 try {
2     future.get(10, TimeUnit.SECONDS);
3 } catch (InterruptedException | ExecutionException | TimeoutException e) {
4     e.printStackTrace();
5 }

 2.5 CountDownLatch

CountDownWatch(在JDK5中引入)是一个实用程序类,它阻塞一组线程,直到一些操作完成。

2.6 CyclicBarrier

Cyclicbarrier的工作原理与countdownloach几乎相同,只是我们可以重用它。与CountDownWatch不同,它允许多个线程在调用最终任务之前使用await()方法(称为barrier条件)相互等待。

 1 public class Task implements Runnable {
 2  
 3     private CyclicBarrier barrier;
 4  
 5     public Task(CyclicBarrier barrier) {
 6         this.barrier = barrier;
 7     }
 8  
 9     @Override
10     public void run() {
11         try {
12             LOG.info(Thread.currentThread().getName() + 
13               " is waiting");
14             barrier.await();
15             LOG.info(Thread.currentThread().getName() + 
16               " is released");
17         } catch (InterruptedException | BrokenBarrierException e) {
18             e.printStackTrace();
19         }
20     }
21  
22 }

现在我们可以调用一些线程来竞争屏障条件。

 1 public void start() {
 2  
 3     CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
 4         // ...
 5         LOG.info("All previous tasks are completed");
 6     });
 7  
 8     Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
 9     Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
10     Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 
11  
12     if (!cyclicBarrier.isBroken()) { 
13         t1.start(); 
14         t2.start(); 
15         t3.start(); 
16     }
17 }

在这里,isbroken()方法检查执行期间是否有任何线程被中断。在执行实际流程之前,我们应该始终执行此检查。

2.7 Semaphonre

信号量用于阻止线程级对物理或逻辑资源的某些部分的访问。信号量包含一组许可证;每当线程试图进入关键部分时,它需要检查信号量是否有许可证可用。

2.8 ThreadFactory

ThreadFactory充当线程(不存在)池,根据需要创建新线程。它消除了为实现高效的线程创建机制而需要大量样板代码的需要。

 1 public class BaeldungThreadFactory implements ThreadFactory {
 2     private int threadId;
 3     private String name;
 4  
 5     public BaeldungThreadFactory(String name) {
 6         threadId = 1;
 7         this.name = name;
 8     }
 9  
10     @Override
11     public Thread newThread(Runnable r) {
12         Thread t = new Thread(r, name + "-Thread_" + threadId);
13         LOG.info("created new thread with id : " + threadId +
14             " and name : " + t.getName());
15         threadId++;
16         return t;
17     }
18 }
1 BaeldungThreadFactory factory = new BaeldungThreadFactory( 
2     "BaeldungThreadFactory");
3 for (int i = 0; i < 10; i++) { 
4     Thread t = factory.newThread(new Task());
5     t.start(); 
6 }

2.9 BlockingQueue

在异步编程中,最常见的集成模式之一是生产者-消费者模式。

2.10 DelayQueue

DelayQueue是一个无限大的元素阻塞队列,其中只有在元素的过期时间(即用户定义的延迟)完成时才能提取元素。因此,最上面的元素(head)将具有最大的延迟量,并将在最后进行轮询。

2.11 Locks

Lock是一个实用程序,用于阻止其他线程访问某段代码,除了当前正在执行该段代码的线程之外。

 

2.12 Phaser

相较于Cyclicbarrier和Countdownloatch,相较于它是一个更灵活的解决方案——用于充当一个可重用的屏障,在这个屏障上,动态线程数需要等待才能继续执行。我们可以协调多个执行阶段,为每个程序阶段重用一个阶段器实例。

参考网址: 

http://www.falkhausen.de/Java-8/java.util/concurrent/Future.html

http://tutorials.jenkov.com/java-util-concurrent/index.html

原文地址:https://www.cnblogs.com/wylwyl/p/10495444.html