
1 前言

之前的文章异步任务执行器简介对ExecutorService有过简单的介绍,AbstractExecutorService是一个实现ExecutorService接口的非常重要的抽象类,它提供了ExecutorService接口的默认实现。一般情况下,我们可直接继承AbstractExecutorService来实现自己的任务执行器,AbstractExecutorService已经实现ExecutorService接口中的大部分抽象方法,我们自己主要去实现execute方法,此抽象类大降低了编写自定义的任务执行器的难度。我们常用的线程池执行器ThreadPoolExecutor和ForkJoin框架的执行器ForkJoinPool都是直接继承抽象类AbstractExecutorService。 (本文基于JDK1.8)

AbstractExecutorService使用newTaskFor返回的RunnableFuture来实现submit,invokeAny和invokeAll方法,该方法默认为此程序包中提供的FutureTask类(FutureTask在FutureTask源码完整解读做过详细说明,这里不再赘述)。 submit系列方法将创建一个关联的RunnableFuture,该关联的RunnableFuture将被执行并返回, 子类可以重写newTaskFor方法以返回RunTask以外的RunnableFuture实现。

使用自定义Future的示例: 这是自定义ThreadPoolExecutor以使用CustomTask类而不是JDK默认的FutureTask类

   public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

   static class CustomTask<V> implements RunnableFuture<V> {...}

   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
       return new CustomTask<V>(c);
   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
       return new CustomTask<V>(r, v);
   // ... add constructors, etc.

2 与之相关的ExecutorCompletionService


ExecutorCompletionService是 一个支持性的工具类,其内部委托Executor去执行任务。 ExecutorCompletionService会将已完成任务的结果Future放入到队列中,可调用take或poll方法获取任务的结果Future. 这个类是轻量级的,适合在处理任务组(多任务)时临时性使用。


1) CompletionService接口


public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

2) 使用示例


 void solve(Executor e,   Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)

使用示例2: 反过来,假设您要使用任务组的第一个非空结果,忽略出现异常的任何结果,并在第一个任务就绪(完成)时取消所有其他任务

     void solve(Executor e,
                Collection<Callable<Result>> solvers)
         throws InterruptedException {
         CompletionService<Result> ecs
             = new ExecutorCompletionService<Result>(e);
         int n = solvers.size();
         List<Future<Result>> futures
             = new ArrayList<Future<Result>>(n);
         Result result = null;
         try {
             for (Callable<Result> s : solvers)
             for (int i = 0; i < n; ++i) {
                 try {
                     Result r = ecs.take().get();
                     if (r != null) {
                         result = r;
                 } catch (ExecutionException ignore) {}
         finally {
             for (Future<Result> f : futures)
         if (result != null)

3) 成员变量与方法分析


private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;


public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;


private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    //成员内部类可直接访问外部类的成员变量.  任务完成后,向阻塞队列completionQueue中添加一个元素(当前任务)
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;


private RunnableFuture<V> newTaskFor(Callable<V> task) {
    if (aes == null)
        return new FutureTask<V>(task);
    else //aes不为空,就使用它自己的newTaskFor方法来包装任务
        return aes.newTaskFor(task);
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
        return aes.newTaskFor(task, result);


public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;


public Future<V> take() throws InterruptedException {
    return completionQueue.take();

public Future<V> poll() {
    return completionQueue.poll();

public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    return completionQueue.poll(timeout, unit);

3 API实现原理

1) submit系列方法


不论入参是Runnable还是Callable,最终都会将其封装成RunnableFuture,而RunnableFuture又是Runnable的子接口,而后去调用execute(Runnable)执行任务,最后再返回这个RunnableFuture 。注意:这里的execute(Runnable)还是抽象方法,需要子类去实现这个方法。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        return ftask;
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        return ftask;
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        return ftask;


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);

2) invokeAll系列方法


其主要逻辑比较简单:①将所有任务统一包装成RunnableFuture,并依次调用execute准备执行每个任务 ; ②等待所有任务执行完成;③若所有任务都正常完成,就返回Future集合,若在执行任务时某任务被取消或抛出异常,就取消其他所有任务,再返回Future集合

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);//获取每个Future
            if (!f.isDone()) {//任务未完成
                try {
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
        done = true;//正常完成,直接返回Future集合
        return futures;
    } finally {
        if (!done)//执行各任务过程中,任一任务被取消或抛出异常,就取消所有任务
            for (int i = 0, size = futures.size(); i < size; i++)

invokeAll(Collection, long, TimeUnit)方法是invokeAll(Collection)的超时版本, 两者逻辑大致相同,“invokeAll(Collection, long, TimeUnit)”需要指定等待时间的时间,若超时后还有任务还未完成,这些任务就会被取消。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
        throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) { //准备依次执行每个任务,并检查是否超时
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                nanos = deadline - System.nanoTime();
        done = true;
        return futures;
    } finally { //某任务被取消或抛出异常或超时,将所有任务取消
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
invokeAll(Collection, long, TimeUnit)

3) invokeAny系列方法


invokeAny(Collection, long , TimeUnit )是超时版本的invokeAny(Collection),它对任务的执行耗时做了限制,如果在限定时间内有一任务正常(没抛出异常)完成,就返回此任务的结果 ,其他将任务会被取消;如果没有任务能在限时内成功完成返回,就抛出TimeoutException; 没有任务正常成功返回(可能是因发生某种异常而返回),将抛出ExecutionException.

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));

从上面的代码可以看出这两个方法都委托doInvokeAny(Collection, long , TimeUnit )来实现的,我们可以来看看doInvokeAny是怎么做的。与invokeAll方法最大的不同在于:doInvokeAny中执行任务没有调用本类中的execute方法,它引入了一个新类ExecutorCompletionService,doInvokeAny调用ExecutorCompletionService.sumbmit来提交任务。

doInvokeAny的主要逻辑:①先构建一个ExecutorCompletionService类型实例ecs,然后使用ecs.submit开始提交第一个任务,然后进入for循环自旋;②进入自旋后,开始尝试查看是否有任务已完成, 若没有任务已完成且还有任务未提交,就继续调用ecs.submit提交下一个任务,若没有任务已完成且所有任务都已提交,则(设置过超时时间)超时等待或(未设置超时时间)无限时长等待任务完成;③若有任务已完成,就返回此任务的结果;④若执行任务过程中发生了异常,就抛出异常;⑤取消其他未完成的任务

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); //容纳任务组的容器
    ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
    // For efficiency, especially in executors with limited
    // parallelism, check to see if previously submitted tasks are
    // done before submitting more of them. This interleaving
    // plus the exception mechanics account for messiness of main
    // loop.
   // 为了提高效率(尤其是在并行性有限的执行器中),在提交更多任务之前会检查是否完成了先前提交的任务。
   // 这种交织加上异常机制使得主循环比较混乱。
    try {
        // Record exceptions so that if we fail to obtain any
        // result, we can throw the last exception we got.
        ExecutionException ee = null;//记录执行任务过程出抛出的异常
        final long deadline = timed ? System.nanoTime() + nanos : 0L;//计算超时的时刻
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // Start one task for sure; the rest incrementally
        int active = 1; //计录在执行的任务数

        for (;;) {
            Future<T> f = ecs.poll();//取出一个已完成任务
            if (f == null) { //没有已完成的任务
                if (ntasks > 0) {
                //当前没有任务已完成、所有任务已提交 且没有任务在执行时,退出循环
                else if (active == 0) //
                //当前没有任务完成、所有任务已提交 且还有任务在执行时、设置了超时,就超时等待
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                    //当前没有任务完成、所有任务已提交 且还有任务在执行时、未设置超时,就不限时长地等待
                    f = ecs.take();
            if (f != null) {//有一个任务已完成
                --active; //还在执行的任务数自减
                try {
                    return f.get();//返回结果
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {//取消其他任务
        for (int i = 0, size = futures.size(); i < size; i++)