ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor详解

简介

  • 继承自ThreadPooExecutor,为任务提供延迟或周期执行.
  • 使用专门的ScheduledFutureTask来执行周期任务,也可以接收不需要时间调度的任务.
  • 使用DelayedWorkQueue存储任务.(一种无界延迟队列)
  • 支持线程池关闭后可执行,可选择线程池关闭后支持继续执行周期或延迟任务.

解析

内部结构

内部有两个内部类:

  • ScheduledFutureTask:可以延迟执行的异步运算任务.
  • DelayedWorkQueue:存储周期或延迟任务的延迟队列.

ScheduledFutureTask

任务的执行

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

流程:

  1. 若当前状态下不可执行,则取消任务.
  2. 若可以执行,则检查是否为周期任务.若为非周期任务,则直接执行.
  3. 若为周期任务,则执行后设置下一次执行的时间,并将任务加入周期队列中.

任务的取消

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}

流程:

  1. 取消任务.
  2. 根据参数决定是否从队列中移除此任务.

核心方法

schedule

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

流程:

  1. 若任务为空,则抛出异常.
  2. 检查线程池是否关闭.若关闭,则拒绝任务.
  3. 若没有关闭,则将任务加入到等待队列.
  4. 再次检查线程池是否在运行.若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务.
  5. 若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务.

scheduleAtFixedRate

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

特点: 创建一个周期执行的任务,第一次执行延期时间为initialDelay之后每隔period执行一次,不等待第一次执行完就开始计时.

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

特点:创建一个周期执行的任务,第一次执行延期时间为initialDelay,在第一次执行完之后延迟delay后开始下一次执行.

shutdown()

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}

解释:
判断是否有线程池关闭后保留的任务.

  • 若没有保留的任务,则依次取消任务,并清除队列.
  • 若有保留的任务,则对于非周期性任务,取消该任务并将其清除出队列.
原文地址:https://www.cnblogs.com/truestoriesavici01/p/13235978.html