Java并发包线程池之Executors、ExecutorCompletionService工具类

前言

前面介绍了Java并发包提供的三种线程池,它们用处各不相同,接下来介绍一些工具类,对这三种线程池的使用。

Executors

Executors是JDK1.5就开始存在是一个线程池工具类,它定义了用于Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable的工厂和工具方法。在开始之前,先了解一下它提供的一些内部类:

DelegatedExecutorService、DelegatedScheduledExecutorService、FinalizableDelegatedExecutorService

 1 //只暴露实现ExecutorService接口的方法的包装器类。Delegated 是代理,委托的意思
 2 static class DelegatedExecutorService extends AbstractExecutorService {
 3     private final ExecutorService e;
 4     
 5     //构造器传入一个ExecutorService实例
 6     DelegatedExecutorService(ExecutorService executor) { e = executor; }
 7     
 8     .....
 9 }
10 
11 //可自动终结的包装线程池,FinalizableDelegatedExecutorService的实例即使不手动调用shutdown方法关闭现称池,虚拟机也会帮你完成此任务
12 static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
13 
14     FinalizableDelegatedExecutorService(ExecutorService executor) {
15         super(executor);
16     }
17     
18     //finalize方法会在虚拟机gc清理对象时被调用
19     protected void finalize() {
20         super.shutdown();
21     }
22 }
23 
24 
25 //只暴露实现ScheduledExecutorService的接口方法的一个包装器类。
26 static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService {
27     private final ScheduledExecutorService e;
28     
29     //构造器传入一个ScheduledExecutorService实例
30     DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
31         super(executor);
32         e = executor;
33     }
34     //.....
35 }
View Code

 这三个类都是包装类,DelegatedExecutorService是对ExecutorService的一种包装,仅仅只给使用者暴露 ExecutorService的接口方法,屏蔽掉具体实现类的独有方法。DelegatedScheduledExecutorService是对ScheduledExecutorService的包装,仅仅只给使用者暴露 ScheduledExecutorService的接口方法,而FinalizableDelegatedExecutorService是在对ExecutorService的包装基础上,增加了自动线程池回收的功能,其finalize方法会在虚拟机gc清理对象时被调用,从而将用户忘记关闭的无用线程池关闭并回收。

PrivilegedCallableUsingCurrentClassLoader、PrivilegedCallable则是对Callable任务 的运行上下文和类加载的控制,RunnableAdapter则是用于将Runnable包装成Callable的包装类,DefaultThreadFactory是默认的线程工厂,创建的线程名称都具有:pool-池编号-thread-线程编号,这样的前缀。PrivilegedThreadFactory继承了DefaultThreadFactory,在默认的线程工厂上,扩展了捕获访问控制的上下文和类加载器。

工具方法

一、下面是一些创建线程池的工具方法:

public static ExecutorService newFixedThreadPool(int nThreads)

返回可以执行Runnable、Callable任务的一个固定线程池的ThreadPoolExecutor实例,其corePoolSize和maximumPoolSize都是指定的大小,keepAliveTime为0,任务队列是无界的LinkedBlockingQueue队列。首先根据 ThreadPoolExecutor的特性,corePoolSize和maximumPoolSize都相等时,意味着不会创建非核心线程,在keepAliveTime默认没有应用于核心线程时,其keepAliveTime无论是什么值都无意义,因此这里的keepAliveTime没有实际意义。然后由于是无界队列,maximumPoolSize参数其实也是无意义的,是所有来不及处理的任务都会无条件的丢进该无界队列中,直到系统资源耗尽,因此使用此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

该方法只是比第一个方法newFixedThreadPool(int nThreads) 多传递了一个线程工厂参数,其他都一样。

public static ExecutorService newSingleThreadExecutor()

利用FinalizableDelegatedExecutorService返回包装过得可以执行Runnable、Callable任务的只有单个线程的ThreadPoolExecutor实例。其corePoolSize和maximumPoolSize都是1,任务队列是无界的LinkedBlockingQueue。首先,这是一个单线程按顺序执行任务的线程池,但如果在线程池关闭之前某个任务执行失败异常结束,那么如果需要执行后续任务,将可能会创建一个新的线程替代这个唯一的线程。其次该方法看似与执行newFixedThreadPool(1)的效果一样,但由于该方法放回的线程池经过FinalizableDelegatedExecutorService包装过,屏蔽了更改线程池配置的方法,因此该线程池无法被重新配置。最后经过FinalizableDelegatedExecutorService包装之后,该线程池具有了自动被JVM垃圾回收时终结回收的特性,即使用户使用完之后没有调用shutdown。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

该方法就是比newSingleThreadExecutor()方法多传了一个线程工厂,其他都一样。

public static ExecutorService newCachedThreadPool()

返回可以立即执行Runnable、Callable任务的ThreadPoolExecutor线程池,其corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,但是任务队列是SynchronousQueue的0容量队列。因此,一旦任务被提交将立即被执行,如果线程不够将立即创建线程,该线程池理论上为了满足任务需要可以创建Integer.MAX_VALUE多个线程(当然该方法设置了keepAliveTime为60秒,在线程空闲下来之后所有线程都可能会被销毁),当任务的提交频率超过了任务的平均处理速率,将导致创建越来越多的线程以处理到达的任务,因此也有资源耗尽的潜在风险,必须要有效的控制任务的提交频率。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

指定了线程工厂的newCachedThreadPool(),其他都一样。

public static ScheduledExecutorService newSingleThreadScheduledExecutor()

返回一个可以执行Runnable、Callable类型的周期/延迟任务的的ScheduledThreadPoolExecutor线程池实例,其corePoolSize为1。由于其内部也是一个无界队列,因此maximumPoolSize、keepAliveTime是无效的。所以它也是一个单线程执行延迟/周期任务的线程池,所有来不及处理的任务都会无条件的丢进该无界队列中,直到系统资源耗尽,因此使用此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。其返回经过了DelegatedScheduledExecutorService包装,不可再被转换成ScheduledThreadPoolExecutor实例。

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

指定了线程工厂的newSingleThreadScheduledExecutor(),其他都一样。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个可以执行Runnable、Callable类型的周期/延迟任务的具有指定线程数量的ScheduledThreadPoolExecutor线程池实例,由于其内部是无界队列,所有来不及处理的任务都会无条件的丢进该无界队列中,直到系统资源耗尽,因此使用此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。其返回结果没有经过包装,可以转换成ScheduledThreadPoolExecutor实例。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

指定了线程工厂的newScheduledThreadPool(int corePoolSize),其他都一样。

public static ExecutorService newWorkStealingPool(int parallelism)

返回一个可执行ForkJoinTask任务的具有给定并发度,工作线程按FIFO先进先出的异步工作模式的ForkJoinPool,由于ForkJoinPool是工作窃取机制实现的,所以该方法名有WorkStealing的字样。注意ForkJoinPool用于任务可递归分解执行的ForkJoinTask。由于该方法创建ForkJoinTask指定的asyncMode是ture,不能保证所提交任务的执行顺序,因此一般只适用于不需要合并递归执行结果(即不需要返回结果)的场景,一般用于事件消息传递的场景。

public static ExecutorService newWorkStealingPool()

无参的创建一个可执行ForkJoinTask任务的,工作线程按FIFO先进先出的异步工作模式的ForkJoinPool。该方法与newWorkStealingPool(int parallelism)唯一的区别就是,该方法的并行度是根据当前计算机的CPU核心数确定的,其他都一样,asyncMode也是true,一般来说这个方法比newWorkStealingPool(int parallelism)更好用。

对于以上提供的线程池的工具方法可以看出,Executors并没有返回基于有界队列的ThreadPoolExecutor线程池工具类,这大概是因为有界队列有任务被拒绝的潜在可能,这在大多数情况下用户都是不可接受的。在实际使用中,还是应该视情况而定,不要仅仅限于使用Executors提供的这几个工具方法返回的线程池,毕竟ThreadPoolExecutor线程池是一个可在多个参数上调节其工作机制的线程池。

二、下面是包装线程池的工具方法:

public static ExecutorService unconfigurableExecutorService(ExecutorService executor),包装指定的ExecutorService线程池为DelegatedExecutorService类型,即只暴露ExecutorService 的接口方法,其返回结果不能强转成原来的线程池实现类。按字面意思就是无法对该线程池的配置进行更改,因为已经将那些更改配置参数的setter方法都屏蔽了。

public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) ,包装指定的执行延迟/周期任务的线程池为DelegatedScheduledExecutorService类型,即只暴露ScheduledExecutorService的接口方法,其返回结果不能强转成原来的线程池实现类。按字面意思就是无法对该线程池的配置进行更改。

三、返回一个默认的线程工厂的工具方法:

public static ThreadFactory defaultThreadFactory(),返回默认的线程工厂实现DefaultThreadFactory,创建的线程名称都具有:pool-池编号-thread-线程编号,这样的前缀。创建出的线程都是非守护线程,因此使用该线程工厂的线程池在用完之后最好手动shutdown,避免JVM挂起。

public static ThreadFactory privilegedThreadFactory(),返回捕获了访问控制的上下文和类加载器的默认线程工厂DefaultThreadFactory的扩展类PrivilegedThreadFactory。创建的线程名称也都具有:pool-池编号-thread-线程编号,这样的前缀。

四,Callable相关的工具方法:

public static <T> Callable<T> callable(Runnable task, T result),将一个返回指定结果的Runnable 任务转换成Callable任务。

public static Callable<Object> callable(Runnable task) ,将一个返回结果为null的Runnable 任务转换成Callable任务。

public static Callable<Object> callable(final PrivilegedAction<?> action) ,返回一个在调用时执行指定的PrivilegedAction任务并返回其执行结果的Callable对象。

public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) ,返回一个在调用时执行指定的PrivilegedExceptionAction任务并返回其执行结果的Callable对象。

public static <T> Callable<T> privilegedCallable(Callable<T> callable) ,  返回一个在调用它时可在当前的访问控制上下文中执行给定的Callable任务的Callable对象。

public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable), 返回一个在调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的Callable任务的Callable对象。

CompletionService

这是一个接口,该接口是为了将任务的提交与获取任务的执行结果解耦而设计的,当然这里的任务一般是指多个。这通常应用于那种不关心执行顺序的多个耗时操作。例如异步I/O,或者类比一个页面的不同模块的异步加载。通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现,它虽然是一个需要创建实例的类,但其实也可以看作是一种工具类。

内存一致性效果:线程向 CompletionService 提交任务之前的操作 happen-before 该任务实际的执行操作,后者依次 happen-before 紧跟后面从完成队列中成功取走其返回结果的操作。

ExecutorCompletionService

首先看起其内部类QueueingFuture:

 1 public class ExecutorCompletionService<V> implements CompletionService<V> {
 2     private final Executor executor;
 3     private final AbstractExecutorService aes;
 4     private final BlockingQueue<Future<V>> completionQueue; //已完成任务的Future阻塞队列
 5 
 6  
 7     //将以完成的任务的Future排队的实现
 8     private class QueueingFuture extends FutureTask<Void> {
 9 
10         QueueingFuture(RunnableFuture<V> task) {
11             super(task, null);
12             this.task = task;
13         }
14         //实现回调接口,当任务被完成,将Future放进阻塞队列
15         protected void done() { completionQueue.add(task); }
16         private final Future<V> task;
17     }
18 
19     ......
20 }
View Code

QueueingFuture就是对FutureTask的扩展,利用了FutureTask任务在被执行完成之后会回调done()方法的特性,将已经完成的任务立即放到一个阻塞队列。

ExecutorCompletionService有两个构造方法,都需要传入一个Executor 线程池实现类,用来实际执行任务,另一个可选的参数就是内部使用的阻塞队列,阻塞队列的选取可以决定存取已完成任务的Future的顺序。

下面是它实现CompletionService 接口的方法:

 1 //提交Callable任务
 2 public Future<V> submit(Callable<V> task) {
 3     if (task == null) throw new NullPointerException();
 4     RunnableFuture<V> f = newTaskFor(task);
 5     executor.execute(new QueueingFuture(f));
 6     return f;
 7 }
 8 
 9 //提交Runnable任务,返回固定的结果
10 public Future<V> submit(Runnable task, V result) {
11     if (task == null) throw new NullPointerException();
12     RunnableFuture<V> f = newTaskFor(task, result);
13     executor.execute(new QueueingFuture(f));
14     return f;
15 }
16 
17 //获取一个已经完成的任务的Future,直到成功或者被中断。
18 public Future<V> take() throws InterruptedException {
19     return completionQueue.take();
20 }
21 
22 //获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。
23 public Future<V> poll() {
24     return completionQueue.poll();
25 }
26 
27 //超时版本的poll
28 public Future<V> poll(long timeout, TimeUnit unit)
29         throws InterruptedException {
30     return completionQueue.poll(timeout, unit);
31 }
View Code

从这些代码可见,都很简单,提交的任务被封装成QueueingFuture任务,从而可以使那些已经完成的任务被立即丢进准备好的阻塞队列,take/poll则是对阻塞队列中的已经完成的任务的Future的提取了。

以下是Java Doc中的两个示例,示例一:

 1 void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
 2      CompletionService ecs = new ExecutorCompletionService(e); //根据指定的线程池实例与任务集合创建ExecutorCompletionService实例
 3      for (Callable s : solvers)
 4          ecs.submit(s); //提交所有任务
 5          
 6      int n = solvers.size();
 7      for (int i = 0; i < n; ++i) {
 8          Result r = ecs.take().get(); //提取已经完成的任务结果
 9          if (r != null)
10              use(r);  //对于非空返回结果执行相应的操作
11      }
12 }

示例一一次性提交一组任务,并且对每一个任务返回的非空结果执行给定的操作。

示例二:

 1 void solve(Executor e, Collection<Callable<Result>> solvers)
 2      throws InterruptedException {
 3      CompletionService ecs = new ExecutorCompletionService(e);
 4      int n = solvers.size();
 5      List<Future<Result>> futures = new ArrayList>(n);
 6      Result result = null;
 7      try {
 8          for (Callable s : solvers)
 9              futures.add(ecs.submit(s)); //提交所有任务,并记录Future
10              
11          for (int i = 0; i < n; ++i) {
12              try {
13                  Result r = ecs.take().get();
14                  if (r != null) { //发现一个任务完成就退出
15                      result = r;
16                      break;
17                  }
18              } catch (ExecutionException ignore) {}
19          }
20      }
21      finally {
22          for (Future f : futures) //取消其余所有任务
23              f.cancel(true);
24      }
25 
26      if (result != null) //执行相应的操作
27          use(result);
28  }
View Code

示例二虽然也提交了一组任务,但只要有一个任务返回非空结果就取消其他所有任务,并那种该结果执行指定的操作。

ExecutorCompletionService利用了FutureTask任务不论是异常结束,还是正常结束还是被取消,都会回调其done方法的特点,扩展FutureTask并实现了该方法将不论以任何方式结束的任务的用于获取异步任务结果的Future放入一个阻塞队列中,因此可以通过读取队列的方法顺序获取那些任务的执行结果,由于任务可能是异常结束,因此在使用的时候,需要对Future.get()拿到的执行结果进行空值判断。

原文地址:https://www.cnblogs.com/txmfz/p/11261152.html