线程池 一 ScheduledThreadPoolExecutor

java.util.concurrent
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService

简介

ScheduledThreadPoolExecutor可以在指定的时延后,调度一个任务,或间歇性地执行任务。
当需要多线程执行任务或需要的ThreadPoolExecutor的灵活性和功能性时, ScheduledThreadPoolExecutor是一个比java.util.Timer中更优的选择。

只有最先到期的任务会出队,如果没有任务或者队首任务未到期,则工作线程会阻塞;

一旦核心线程池满了,ScheduledThreadPoolExecutor不会像ThreadPoolExecutor那样再去创建归属于非核心线程池的工作线程,而是直接返回。

优点

  1. 使用多线程执行任务,不用担心任务执行时间过长而导致任务相互阻塞的情况(Timer是单线程执行的,因而会出现这个问题)
  2. 不用担心任务执行过程中,如果线程失活,其会新建线程执行任务(Timer类的单线程挂掉之后是不会重新创建线程执行后续任务的)

源码

ScheduledThreadPoolExecutor:

属性:

/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

/**
* False if should cancel non-periodic not-yet-expired tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

方法:

public<V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit);

    public<V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit),
                                       sequencer.getAndIncrement()));
        // 延时执行任务
        delayedExecute(t);
        return t;
    }

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // 将任务添加到任务队列中,会根据任务的延时时间进行排序
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
                //预先启动工作线程,确保线程池中有工作线程。
                ensurePrestart();
        }
    }

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        //如果小于核心池数量,就创建新的工作线程
        if (wc < corePoolSize)
            addWorker(null, true);
        //说明corePoolSize数量是0,必须创建一个工作线程来执行任务
        else if (wc == 0)
            addWorker(null, false);    //ThreadPoolExecutor的方法,用来执行线程任务
    }

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);    //delay:延时时间

//initialDelay:第一次延时时间;delay:周期间隔
//执行任务的间隔是固定的,无论上一个任务是否执行完成,
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
        long initialDelay,long delay,TimeUnit unit);

//执行时间间隔是不固定的,其会在周期任务的上一个任务执行完成之后才开始计时,并在指定时间间隔之后才开始执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
        long initialDelay,long period,TimeUnit unit);



ScheduledThreadPoolExecutor 内部类:

    private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private final long sequenceNumber;    //为相同延时任务提供的顺序编号

        private volatile long time;    //任务可以执行的时间,纳秒级

        private final long period;    //重复任务的执行周期时间,纳秒级。

        RunnableScheduledFuture<V> outerTask = this;    //重新入队的任务

        int heapIndex;    //延迟队列的索引,以支持更快的取消操作


        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                            long sequenceNumber)

        ScheduledFutureTask(Runnable r, V result, long triggerTime,
                            long period, long sequenceNumber)

        ScheduledFutureTask(Callable<V> callable, long triggerTime,
                            long sequenceNumber)

        public void run() {
            boolean periodic = isPeriodic();    //是否为周期任务
            if (!canRunInCurrentRunState(periodic))    //当前状态是否可以执行
                cancel(false);
            else if (!periodic)
                //不是周期任务,直接执行
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();    //设置下一次运行时间
                reExecutePeriodic(outerTask);    //重排序一个周期任务
            }
        }
    }



DelayedWorkQueue 内部类:

    static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size;

        private Thread leader;

        /**
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
         */
        private final Condition available = lock.newCondition();
    }

特性

  1. 使用专门的任务类型—ScheduledFutureTask(内部类)来执行周期任务
  2. 使用专门的存储队列—DelayedWorkQueue(内部类)来存储任务,DelayedWorkQueue是无界延迟队列DelayQueue的一种。
  3. 支持可选的run-after-shutdown参数,在池被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟任务。

ScheduledThreadPoolExecutor的关闭策略由两个run-after-shutdown参数实现,用来控制在池关闭之后是否继续执行任务:

  1. continueExistingPeriodicTasksAfterShutdown(boolean类型):表示在池关闭之后是否继续执行已经存在的周期任务
  2. executeExistingDelayedTasksAfterShutdown(boolean类型,默认true,表示继续):表示在池关闭之后是否继续执行已经存在的延迟任务

ScheduledThreadPoolExecutor 重写了 execute(Runnable) 和 submit(Runnable) 方法

    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }


    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

示例

public class Test {

    private ScheduledThreadPoolExecutor executor;
    private Runnable task;

    public Test() {
        this.executor = new ScheduledThreadPoolExecutor(2);
        this.task = initTask();
    }

    //延迟15s后每隔30s执行一次指定的任务,而该任务执行时长为10s。
    private Runnable initTask() {
        return task = () -> {
            sleep(SECONDS, 10);
        };
    }

    public void testFixedTask() {
        executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
        sleep(SECONDS, 120);
    }

    public void testDelayedTask() {
        executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    }
}
原文地址:https://www.cnblogs.com/loveer/p/11414730.html