线程之ExecutorService

总览

Executor

package java.util.concurrent;

public interface Executor {

    void execute(Runnable command);
}
  • 命令模式
  • 分离任务提交和任务执行

ExecutorService

package java.util.concurrent;
import java.util.List;
import java.util.Collection;

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  • 提供具有返回值的Callable对象
  • 异步任务提交结果
  • 批量任务提交
  • 生命周期管理

Runnable转Callable

  • 适配器模式
  • java.util.concurrent.Executors.RunnableAdapter
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

Future

package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  • 获取任务异步执行的结果

异步原理

示例

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<String> submit = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName();
            }
        });
        String result = submit.get();
        System.out.println(result);
    }
  • 提交一个Callable对象或者Runnable对象到ExecutorService里,能够返回一个Future对象,并且可以通过Future对象获取Callable对象或者Runnable对象 的运行结果

  • 接口关系

public interface RunnableFuture<V> extends Runnable, Future<V>
public class FutureTask<V> implements RunnableFuture<V>
  • AbstractExecutorService
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
  • FutureTask
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
  1. 在ExecutorService中,Runnable被适配为Callable
  2. RunnableFuture继承自Runnable和Future
  3. 实现类FutureTask封装了任务运行和结果之间的状态转换
  4. 提交给ExecutorService的Callable对象被封装成RunnableFuture
  5. execute方法传入的是RunnableFuture,返回的对象也是RunnableFuture,同一个对象
  6. 调用Future的get方法时,会被阻塞,知道任务执行完成后才会唤醒get阻塞,返回结果
  7. FutureTask阻塞使用LockSupport,阻塞队列使用Treiber stack

AbstractExecutorService

  • 封装任务为FutureTask,实现通用submit
  • 实现invokeAny,代理给ExecutorCompletionService实现invokeAny语义
  • 实现invokeAll,遍历Future列表,调用get直至所有结果返回

ExecutorCompletionService

  • 依赖Executor实现任务真正执行
  • 内部维护一个阻塞队列
  • 使用QueueingFuture继承FutureTask,当任务执行完成后,把Future对象放进队列
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
  • 提供take方法阻塞等待获取结果

总结:维护一个阻塞队列,当一批任务执行时,每个任务执行完成后都会把Future对象放进队列,因此队列take到的都是执行完成的任务。该线程池的优点是不用自己阻塞每个Future,而是提交一批任务后,优先处理先完成的任务就好了。依赖这个特性实现invokeAny语义,只需要调用take方法,有返回就结束执行即可。

  • 使用示例见java.util.concurrent.AbstractExecutorService.doInvokeAny

ThreadPoolExecutor

  • 主要实现execute方法,submit方法代理到这个方法的实际参数类型已经是RunnableFuture类型了

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • 核心线程数
  • 最大线程数
  • 存活时间
  • 存活时间单位
  • 阻塞队列
  • 线程工厂
  • 拒绝策略

execute流程

  1. 如果当前线程数量小于corePoolSize,直接创建Worker工作线程,执行任务

  2. 如果当前线程数量大于corePoolSize,任务进阻塞队列

  3. 如果阻塞队列满,线程数量小于maximumPoolSize,创建Worker执行任务

  4. 如果线程数达到maximumPoolSize,执行拒绝策略

  5. Worker工作线程启动后,从阻塞队列阻塞获取任务。

  6. 如果线程数大于corePoolSize,那么阻塞获取任务时会调用poll(keepAliveTime, TimeUnit.NANOSECONDS);如果阻塞超时没获取到任务,此时线程数大于corePoolSize会销毁当前线程

  7. 如果线程数不大于corePoolSize并且没有设置核心线程超时,一直take。可以通过allowCoreThreadTimeOut方法让核心线程超时

  8. 可以通过prestartCoreThread预启动核心线程

  9. 可以通过继承当前类重写beforeExecute,afterExecute方法进行任务执行拦截

  10. 拒绝策略:AbortPolicy抛异常;DiscardPolicy丢弃当前任务;DiscardOldestPolicy丢弃最老的任务;CallerRunsPolicy任务提交者执行

  11. CallerRunsPolicy比较有用,可以通过拉低提交速率,减缓线程池压力,达到背压的效果

ScheduledExecutorService

  • 调度线程池,提供简单的调度规则
  • scheduleAtFixedRate固定速率
  • scheduleWithFixedDelay固定延时
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 使用延时队列作为内部阻塞队列,最小堆算法
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V>{
//省略其他方法
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
}
  • 扩展FutureTask,周期调度任务会调用java.util.concurrent.FutureTask.runAndReset(),使周期任务可以重复被调度
  • 固定频率和固定延时的差异为下次任务执行时间计算,固定频率时执行完成时间点加上间隔;固定延时时任务开始时间加上间隔
  • 任务执行完成后,计算下次执行时间,重新进入延时队列调度
原文地址:https://www.cnblogs.com/zby9527/p/13151379.html