ThreadPoolExecutor类

ThreadPoolExecutor类

/**
 * 线程池,执行每个提交的任务
 *
 * 解决了两个问题
 *  1.减少每个任务调度的开销,执行大量异步任务时提供更好的性能
 *  2.提供了资源绑定和管理的方法(并且会维护一些基本的统计信息)
 * 使用广泛,提供了许多可调参数和可扩展钩子(hook)
 * 可快捷使用工厂方法:Executors
 *  1.newCachedThreadPool(无界线程池,自动线程回收)
 *  2.newFixedThreadPool(固定大小线程池)
 *  3.newSigleThreadPool(单个后台线程)
 * 参数设置指导:
 *  1.core and maxinum pool size (ThreadPoolExecutor 将根据核心和最大参数自动调整线程池大小)
 *   -当nThread < corePoolSize 时,提交任务会会创建新的线程来执行任务(即使有线程空闲)
 *   -当nThread < maxiNumPoolSize 时,只有当队列已满时 创建新的线程处理请求
 *   -如果 corePoolSize == maxinumPoolSize 时为fixed-size(固定大小)线程成
 *   -通常构建时设置,也可以使用setCorePoolSize和setManinumPoolSize动态设置
 *  2.On-demand construction 按需构造:通常core threads 只有在新任务到达时才会初始化和开始,也可以通过prestartCoreThread、prestartAllCoreThreads动态重写
 *  3.dread new thread 线程创建:
 *   -默认使用Executors.defaultThreadFactory创建的线程具有相同的Group、priority(NORM_PRIORITY)、non-deamon 状态,提供不同的ThreadFactory可以自定主意线程名、线程组、优先级、守护进程状态等
 *   -如果ThreadFactory.newThread返回null,线程池将继续,但是不能保证任何任务都会被执行
 *  4.keep-alive time 存活时间:
 *   -如果nThread > corePoolThread 且 idleTime > keepAliveTIme 那么 多于线程将被销毁,减少资源占用,如果后面活跃增加,那么将新建线程
 *   -可以通过setKeepAliveTime动态设置存活时间
 *   -默认情况keep-alive策略仅适用于nThread > corePoolSize 的情况,也可以通过 allowCoreThreadTimeOut 使策略适用于核心线程(keepAliveTime 非0)
 *  5.queuing 队列:任何BlockingQueue都可能用于传输和保存被提交的任务,被用来和线程池大小交互
 *   -nThrad < corePoolSize 执行器创建新的线程而不是队列
 *   -nThread  >= corePoolSize 执行器优先请求队列 而不是创建线程
 *   -如果请求不能被队列,那么将创建一个新的线程,如果 nThread > maxinumPoolSize 那么任务将被拒绝
 *  6.三种常用的排队策略
 *   -direct handoffs 直接传递:一般为:SynchronousQueue,直接递交任务给线程而不保存,如果没有立即可用的线程来运行任务,对任务的队列会失败,尝试创建新的线程,处理具有内部状态依赖项的请求集可以避免锁定,通常使用无界的线程池避免任务被拒绝,同时有可能出现无限制的增涨
 *   -unbounded queues 无界队列:使用没有界限的队列,当所有的coreThread都处于忙碌状态时,将导致新任务都被在队列中等待,因此maxinumPoolSize对执行器没有影响,当每个任务独立于其他任务时适用。有可能造成队列无限增长
 *   -bounded queues 有界队列:使用有界队列,有限的maxinumPoolSize有助于防止资源耗尽,调优和控制比较困难。
 *    队列大小和线程大小可以相互交换:1.大队列小池子:最小化cpu使用、os资源和上下文切换,可能人为导致低吞吐量,如果任务经常阻塞(如任务I/O绑定)
* 2.小队列大池子:使Cpu使用更忙,但可能出现不可接受的任务调度,这也会降低吞量
* 7.rejected task 拒绝策略:执行器已经关闭,或者执行器使用的maxinumPoolSize和queue都有界限且已经饱和时,将调用RejectedExecutionHandler的进行处理,默认提供的四种策略:
* -默认:AbortPolicy拒绝:throws RejectedExecutionException
* -CallerRunsPolicy 调用者执行:调用线程自身运行这个任务,简单的反馈控制,降低提交新任务的速度
* -DiscardPolicy 丢弃:提交的任务将被简单的删除
* -DiscardOldPolicy 丢弃最老:线程池没有关闭时,删除队列最前端的任务,然后重试执行(可能再次失败,将导致重复执行)
* -也可以自定义处理器,需要注意策略仅被设计为特定容量或者队列策略下工作
* 8.hook methods 钩子方法:
* -beforeExecute、afterExecutor 每个任务执行前后可以用来初始化或者记录日志、收集统计数据等ended可以在执行器完全终止前执行特殊的处理
* -如果hook、callback、blockingqueue方法抛出异常,内部工作线程可能失败、终止或者被替换
* 9.queue maintenance 队列维护:方法:getQueue允许对工作队列监视和调试,强烈反对用于其他目的。提供的两个方法:remove和purge当大量排队任务被取消时可以帮助回收存储
* 10.reclamation 回收:线程池不再被引用且没有剩余线程时可以在不显示关闭的情况下被回收(GC),可以配置适当的keep-alive(0 = corePoolThread)/allowCoreThreadTimeout 允许所有线程最终死亡
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 线程池主要的状态控制:ctl(control state)使用 AtomicInteger 标示两个打包状态:
     *   workerCount:指示线程的有效数量
     *   runState:指示是否运行、关闭等
     *
     * 为了将他们打包进一个int workerCount 限制:2^29-1 而不是 2^31-1 后续若有需要可以修改为long(shift/mask 也需要修改),现在考虑到效率使用int
     *
     * workerCount:表示worker 已经start并且没有permitted的数量,该值可能与实际活动的线程有暂时的不同:比如:ThreadFactory创建tread失败,正在退出的thread仍在执行bookkeeping在销毁前,用户可见的线程池大小时当前woekers集合
     * runState 提供了主要的lifecycle control,有以下值:
     *   RUNNING 运行中:  接受新的task并处理排队的task
     *   SHUTDOWN 关闭: 不接受新的task,但处理排队的task
     *   STOP 停止:     不接受新的task,也不会处理排队的task,并且interrupt处理中的task
     *   TIDYING 整理:  所有的任务已经终止,woekerCount为0,线程转换到tidying将调用hook method:terminate()
     *   TERMINATED 终止: terminated() 已经运行完成
     *
     * runState之间的顺序很重要,允许有序的比较,随着时间单调增加,但不需要到达每个状态:
     * RUNNING -> SHUTDOWN :      运行-》关闭 运行 shotDown()方法
     * (RUNNING or SHUTDOWN) -> STOP :运行/关闭-》停止 运行shotDownNow()方法
     * SHUTDOWN -> TIDYING :      关闭——》整理 当queue和pool都为空时
     * STOP -> TIDYING:         停止-》整理 当pool为空时
     * TIDYING -> TERMINATED:     当hook method terminated()方法执行完成
     *
     * 当 state 达到 terminated时。awaitTermination()将从方法返回
     *
     * 检测到state转换:shutdown -》 tidyung 不如想象中的直接,因为queue可能从non-empty到empty反之亦然在shutdown期间我们只能终止当queue和woekerCount为空时(workerCount与时候需要re-check)
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState 以高阶位存储
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl :ctl打包和拆包
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * 不需要unpacking的字段访问器,取决于布局和workerCount从不为负
     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.:试图cas增加ctl的woekerCount字段
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.:试图cas减少ctl的woekerCount字段
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * 递减workerCount字段,仅在线程突然终止时候调用(see processWorkerExit),其他递减在getTask中执行
     */
    private void decrementWorkerCount() {
        ctl.addAndGet(-1);
    }

    /**
     * 用于保存task并将task传递给worker thread 的队列,
     * 不要求workQueue.poll()返回空意味着workQueue.isEmpty(),所以只能依赖isEmpty()来查看队列是否为空(例如:决定是否从shutdown->tidying我们必须这样做)
     * 这也适用于特殊队列,如:poll()返回null的DelayQueues,也可能在特定延迟后返回non-null
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * 锁定workers set 和 bookkeeping相关的访问,我们可以使用某种并发集合,但事实证明lock更好用
     * 其中一个原因时:lock序列化了interruptldleWorkers,避免了不必要的中断风暴,尤其是关机期间。否则退出的线程将同时那些尚未中断的
     * 同时也简化了相关统计数据bookkeeping如largestPoolSize等,在shutDown和shutDownNow时持有mainLock确保人工操作时稳定,同时检查中断和实际中断权限
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 所有的worker thread 集合,必须持有mainLock访问
     */
    private final HashSet<Worker> workers = new HashSet<>();

    /**
     * 用于支持awaitTermination的等待conditin
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * 跟踪最大的线程池大小,需要持有mainLock访问
     */
    private int largestPoolSize;

    /**
     * 计数器用于统计已经完成的任务,只在worker thread 终止时更新,需要持有mainLock访问
     */
    private long completedTaskCount;

    /*
     * 所有的参数声明为:volatiles 所以正在进行的操作都是基于最新值
     * 但是不需要locking,因为没有内部不变量依赖他们相对于其他操作的同步修改
     */

    /**
     * 创建新线程的工厂,所有threads 使用这个工厂创建(通过 addWorker)
     * 所有调用者必须做好addWorker失败的准备,可能反映了系统或者用户限制线程数的策略
     * 即使没有将其视为错误,但是创建新的线程失败可能导致任务被拒绝或者仍然在队列中等待
     *
     * 进一步保持pool不变,但是仍有可能面对错误:如抛出OutOfMemory
     * 由于需要在线程中分配本机堆栈,这种错误很常见,用户将希望执行清理线程池以关闭,有可能有足够的空间完成清理而不遇到OutofMermory
     * the need to allocate a native stack in Thread.start, and users
     */
    private volatile ThreadFactory threadFactory;

    /**
     * 在 饱和或者关闭时调用处理器
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * 等待工作的工作线程超时(nanoseconds 为单位)
     * 当存在超过corePoolSize或者allowCorethreadTimeout时,线程使用此超时,否则将永远等待新的同作
     */
    private volatile long keepAliveTime;

    /**
     * 如果 false (default 默认),core thread 保持存活即使空闲
     * 如果 true, core threads 使用 keepAliveTime 作为超时时间等待工作
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * 除非allowCoreThreadTimeoput被设置(这种情况最小值为0),否则线程池带下保持最小的活动线程数量*/
    private volatile int corePoolSize;

    /**
     * 最大线程池数量*/
    private volatile int maximumPoolSize;

    /**
     * 默认拒绝处理器
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    /**
     * 调用 shutdown 和 shutdownNow 的调用者许可 以及(checkShutdownAccess)调用者的中断线程权限(Thread.interrupt 控制).
     * 
     * 所有实际调用 Thread.interrupt (参见 interruptIdleWorkers and interruptWorkers) 都忽略了SecurityExceptions, 意味着尝试中断时会静默失败
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    /**
     * Worker 主要维护运行任务的中断状态控制和其他次要的bookkeeping功能
     * 继承 AbstractQueueedSynchronize,简化获取和释放任务执行时的锁
     * 为了防止一些中断,这些中断的目的是唤醒线程而不是中断运行中的任务,实现了一个简单的不可重入锁,不希望像调用setCorePoolSize这样的线程池控制方法时可重新获取锁
     * 此外为了在线程时间运行任务前抑制中断,将 lock state设置为负值,并在启动时清除它(在runWorker)
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * 这个类不会被序列化,消除java警告
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** worker当前的运行线程,如果threadFactory失败则为null. */
        final Thread thread;
        /** 要运行的初始任务,可能为null */
        Runnable firstTask;
        /** 线程任务计数器 */
        volatile long completedTasks;

/** * 创建新的worker使用给定的task以及threadFactory * firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // 禁止中断,直到运行worker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 将主运行循环委托给 runWorker */ public void run() { runWorker(this); } // Lock methods // The value 0 表示 unlocked state. // The value 1 表示 locked state. protected boolean isHeldExclusively() { return getState() != 0; }
    /**
     * 尝试锁定
     */
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//cas设置state状态为锁定 setExclusiveOwnerThread(Thread.currentThread());//设置独占所有者线程为当前线程 return true; } return false; }
    /**
     * 尝试解锁
     */
protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null);//设置独占线程所有者为null setState(0);//设置state为解锁 return true; } public void lock() { acquire(1); }//锁定 public boolean tryLock() { return tryAcquire(1); }//尝试锁定 public void unlock() { release(1); }//解锁 public boolean isLocked() { return isHeldExclusively(); }//是否解锁
    /**
     * 如果状态已经就绪:0/1,进行中断
     */
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//状态就绪且thread存在而且未被中断 try { t.interrupt();//中断 } catch (SecurityException ignore) {//忽略异常 } } } } /* * 设置ctl(control state )的方法 */ /** * 将线程池runState设置为指定状态,如果已经至少时给定目标则将其保留 * * @param targetState 指定所需的状态, 是 SHUTDOWN 或 STOP * (但不会是 TIDYING 或 TERMINATED --为此使用tryTerminate) */ private void advanceRunState(int targetState) { // 断言 targetState == SHUTDOWN || targetState == STOP; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } /** * 如果线程池状态是shutdown(且poll和queue为空)或者stop(poll为空)则将runState转换到termination * 如果有资质终止但woekerCount非0,则中断空闲线程取保shutdown信号传播 * 必须在所有可termination的操作后调用此方法,以减少worker数量或者移除queued task 在shutdown期间 * 方法nono-private以允许从 ScheduledThreadPoolExecutor 访问 */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; //如果执行器运行中或者已经至少在tidying状态或者(stop状态,但是queue非空)则不进行处理 if (workerCountOf(c) != 0) { // 资格终止 interruptIdleWorkers(ONLY_ONE);//interrupt 一个线程 return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock();//持有mainLock try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//cas ctl为runState=tidying,workerCount=0 try { terminated();//终止 } finally { ctl.set(ctlOf(TERMINATED, 0));//cas ctl为runState=termination,workerCount=0 termination.signalAll();//唤醒等待在终止condition线程 } return; } } finally { mainLock.unlock();//mainLock解锁 } // 其他情况重试失败的cas } } /* * 控制中断woeker的方法 */ /** * 如果存在安全管理器,确保取保调用者有权限shutdown线程 * 如果通过,进一步去人调用者允许中断每个worker thread */ private void checkShutdownAccess() { // assert mainLock.isHeldByCurrentThread(); SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); for (Worker w : workers) security.checkAccess(w.thread); } } /** * 中断所有线程,即使是在活动的线程,忽略securityexception(这种情况下一些线程可能保持没有被中断) */ private void interruptWorkers() { // assert mainLock.isHeldByCurrentThread(); for (Worker w : workers) w.interruptIfStarted(); } /** * 中断可能这在等待任务的线程(如:未锁定的线程)以便检查termination或配置更新。忽略securtyesception(这种情况可能导致一些线程保持未中断) * * @param onlyOne 如果是 true, 最多中断一个woeker,只有在以其他方式终止是才会从tryTermination(并且人有其他woeker)调用这个函数 * 这种情况下最多只会有一个等待的woeker来传播shutdown信号 * 中断任意线程可以确保shutdown以来新到达的worker最终也会退出 * 为保证最终终止,总是只要终止一个空闲woeker就可以了,但是但是shutDown所有的空闲worker,这样冗余的worker可以立即退出,而不是等待一个掉队的任务了 */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取mainLock锁定 try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//worker thread没有被中断且处于等待中 try { t.interrupt();//中断 } catch (SecurityException ignore) {//忽略安全检查异常 } finally { w.unlock();//woeker解除锁定 } } if (onlyOne) break;//是否只需要中断一个空闲线程 } } finally { mainLock.unlock();//mainLock解锁 } } /** * 常见的interruptIdleWorkers形式,避免了解onlyOne的含义 */ private void interruptIdleWorkers() { interruptIdleWorkers(false); } private static final boolean ONLY_ONE = true; /* * misc实用程序,大多数还导入到ScheduledTjreadPoolExecutor */ /** * 为给定命令调用拒绝执行处理程序 */ final void reject(Runnable command) { handler.rejectedExecution(command, this); } /** * 调用shutdown是执行状态转换后的进一步清理,这里是空实现,ScheduledThreadPoolExecutor实用来取消延迟任务 */ void onShutdown() { } /** * 将任务排队到一个新的队列中, 通常使用drainTo * 如果对列是一个延迟队列或者其他poll/drainTo 可能无法删除某些元素的队列,那么它将逐个删除他们 */ private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } /* * worker 的创建、运行、和清理方法 */ /** * 检查当前pool的state和给定的界限(corePoolSize或者maxinumPoolSize)是否支持添加新的woeker * 如果可以:workerCounter 计数器将进行相应的调整,同时如果可能一个新的worker将创建并启动,运行firstTask 作为 first task * 如果线程池已经停止或者正在关闭,当thread factory 创建线程失败时也将返回false(创建失败可能是返回null,也坑是由于异常,异常时将进行回滚) * * @param firstTask 新的线程应该首先运行的任务(可能为null) * 创建一个新的woeker当pool数量小于corePoolSize时(一直绕过队列),或者queue已满(必须绕过队列) * 初始化空闲线程通常通过prestartCoreThread创建或者替换其他即将死亡的worker * * @param true:core?maxinum 来作为线程池界限(不实用具体引值为保证引用最新数值) * * @return true 如果成功 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 仅在必要时检查队列是否为空 if (runStateAtLeast(c, SHUTDOWN)  //线程池已经shutdown && (runStateAtLeast(c, STOP)  //同时:线程池已经stop/firstTask不为空/queue为空 || firstTask != null || workQueue.isEmpty())) return false; //失败 for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) //当前线程池大小超过给定界限core/maxinum return false; if (compareAndIncrementWorkerCount(c)) //CAS尝试增加workerCounter break retry;  //自增成功,继续后续处理 c = ctl.get(); // Re-read ctl 重新读取 if (runStateAtLeast(c, SHUTDOWN)) //检查状态是否过期 continue retry; //重新开始retry循环 // 否则CAS因workerCount失败,重试内循环 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //初始worker final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //获取mainLock mainLock.lock(); try { // 持有锁时重新检查 // 退出:当threadFactory失败或者shutdown 在获取锁定之前 // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck 预检查thread是否可以启动 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //工作集合添加成功 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //工作集合添加失败 addWorkerFailed(w); } return workerStarted; } /** * 回滚工作线程创建 * - 从workers set移除此worker * - 递减woekerCount * - 重新检查终止,防止worker拒绝终止 */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } /** * 为死亡中的线程进行清理和记账,仅从worker thread 调用 * 只有当completedAbruptly被设置,确认workerCount已经调整当退出时 * 该方法从worker set 中移除线程,同时可能终止线程池或者替换worker如果:因为用户异常而退出或者池子小于core或者queue non-empty而没有workers    * * @param w the worker * @param worker因为用户异常而死亡 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 用户异常,调整workerCount decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // 不需要更换 } addWorker(null, false);//添加新的worker } } /** * 执行阻塞或者定时等待取决于配置 * 返回null 如果因为任何原因这个worker必须退出: * 1. worker数量多于maxinum (因为调用setMaximumPoolSize). * 2. 线程池已经停止 * 3. 线程池已经shutdown同时queue为空 * 4. worker等待task超时, 这个worker有可能被终止,(同也就是说: allowCoreThreadTimeOut || workerCount > corePoolSize) * 定时等待前后,如果队列非空,那么这个worker不是池子中的最后一个线程 * * @return task, 或者 null 如果这个worker必须退出,这种情况下woekerCount将递减 */ private Runnable getTask() { boolean timedOut = false; // 任务获取超时 for (;;) { int c = ctl.get(); // 仅在必要时检查对列是否为空 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { //线程池已经shutdown同时queue为空/已经stop decrementWorkerCount(); return null; } int wc = workerCountOf(c); //worker 是否会被淘汰? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) //worker允许超时且已经超时 && (wc > 1 || workQueue.isEmpty())) { //同时wc>1或者queue为空 if (compareAndDecrementWorkerCount(c)) //设置wc递减 return null; continue; //继续cas } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //超时等待或者阻塞等待取决于此worker是否有存活限制 if (r != null) return r; //获取task成功直接返回 timedOut = true; //设置任务获取超时 } catch (InterruptedException retry) { timedOut = false; } } } /** * 主要的程序运行循环,重复从queue中获取任务并执行。同时几处需要主要的问题: * * 1. 可能从一个初始任务开始,不需要得到first task,否则只要pool在运行,从getTask方法获取task * 如果getTask返回null worker将退出由于pool state或者config parameter change,其他的退出出口则是由于执行外部代码导致 * 这种情况由completedAbruptly处理,通常使用processWorkerExit替代这个线程 * * 2. 执行任何task之前,获取锁防止任务执行时其他pool中断,确保除非pool停止,否则线程不会有他自己的中断集合 * * 3. 每个任务执行前都由一个对beforExecute的调用,该调用引发的异常回导致线程死亡(立即终止循环),而不是处理任务 * * 4. 开设beforExecute正常执行结束,将运行task,然后收集任何运行抛出的异常发送给afterExecute * 分开处理:RuntimeException和Error(两种都会捕捉)和任何可抛出对象 * 因为我们不能在Runnable.run中重新抛出throwable,所以我们重新包装他们到线程的UncaughtException * 任何异常都会导致线程死亡 * * 5. task.run完成后,将调用afterExecute方法,也可能抛出异常,这也可能导致线程死亡,这个异常也是有效的 * * 异常机制的效果是afterExecute和线程的uncaughtException拥有关于用户代码遇到问题的尽可能的准确信息 * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 获取worker携带得到firstTask,并设置worker状态为可中断 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 任务:firstTask||getTask 存在 w.lock(); //worker lock // 如果线程池停止,确保线程是interrupted, // 如果线程池没有停止,确保线程没有中断,这需要recheck为了处理当进行interrupt清理时shutDownNow if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); //中断任务线程 try { beforeExecute(wt, task); //运行beforeExecute hook方法 try { task.run(); //执行任务 afterExecute(task, null); //任务正常执行结束后调用afterExecute hook方法 } catch (Throwable ex) { afterExecute(task, ex); //异常后调用afterExecute hook 方法 throw ex; //抛出异常 } } finally { //清理task,worker 计数 unlock task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; //非异常结束 } finally { processWorkerExit(w, completedAbruptly); //异常结束处理 } } // 构造器和其他public方法 /** * 根据给定参数构造*/ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * 根据给定参数构造*/ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * 根据给定参数构造*/ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * 根据构定参数构造ThreadPolExecutor * * @param corePoolSize 线程池中要保留的线程数,即使空闲(除非设置allowCoreThreadTimeout) * @param maximumPoolSize 线程池中允许的最多线程数量 * @param keepAliveTime 当线程数量大于core时,这是多于线程等待任务的最大时间 * @param unit 时间单位,用于keepAliveTime * @param workQueue 用于在任务执行前保存他们的队列,这个队列只包含execute提交的任务 * @param threadFactory 创建新线程使用的工厂 * @param handler 当执行被阻塞时使用的处理程序,因为pool的边界和queue的容量 * @throws IllegalArgumentException 参数异常<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * 执行给定任务在未来某个时间,具体文档见父类型接口定义 * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 分3步进行: * * 1. 如果nThread<core 则尝试以给定command作为firstTask启动一个新的worker,addWoeker的调用原子性检查runStae和workerCount,通过返回flase防止错误警告, * * 2. 如果任务成功队列,应该再次double-check 是否需要添加新的线程(因为自上次检查以来有的线程可能已经死亡),或者pool 已经shutDown在进入这个方法后 * 所以recheck并且如果已经停止则回滚,或者启动一个新的线程如果没有线程 * * 3. 如果不能队列任务,则尝试添加一个新线程,如果失败,则可能是关闭或者饱和,所以拒绝任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 执行有序关闭,在此过程中执行先前提交的任务,但是拒绝新的任务,如果已经关闭重复调用不会有额外影响 * 此方法不等待任务完成 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); //设置中断状态 interruptIdleWorkers(); //中断空闲任务 onShutdown(); // 为ScheduledThreadPoolExecutor准备的钩子方法 } finally { mainLock.unlock(); } tryTerminate(); //尝试终止 } /** * 尝试停止所有活动的任务,停止等待处理的任务,并且返回等待执行的任务列表 * 方法不会等待任务终止 * 不给出保证,任何不能相应中断的任务可能永远只想下去*/ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); //设置pool runState interruptWorkers(); //中断等待线程 tasks = drainQueue(); //获取未执行任务列表 } finally { mainLock.unlock(); } tryTerminate(); //尝试终止 return tasks; } public boolean isShutdown() { return runStateAtLeast(ctl.get(), SHUTDOWN); } /** 被ScheduledThreadPoolExecutor 使用. */ boolean isStopped() { return runStateAtLeast(ctl.get(), STOP); } /** * 返回pool在shutDown或shutDownNow之后正在终止,尚未完全终止,则是true,可用于调试 * * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { int c = ctl.get(); return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED); } public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { while (runStateLessThan(ctl.get(), TERMINATED)) { //判断pool尚未terminated if (nanos <= 0L) return false; //超时返回false nanos = termination.awaitNanos(nanos); //terminated 的condition上等待 } return true; //已经结束 } finally { mainLock.unlock(); } } // 覆盖 不抛出异常,于子类兼容 /** * 之前的finalize方法会关闭执行器,现在什么也不做 */ @Deprecated(since="9") protected void finalize() {} /** * 设置用于创建新线程的工厂*/ public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } /** * 返回线程工厂*/ public ThreadFactory getThreadFactory() { return threadFactory; } /** * Sets a new handler for unexecutable tasks.*/ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } /** * Returns the current handler for unexecutable tasks.*/ public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } /** * 设置corePoolSize ,如果小于current value,多于线程下次空闲时将被清除,如果大于,将在需要的时候执行任务使用新的线程*/ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // 不直到真的需要多少新线程,作为一种启发,预先启动足够多的新线程(直到新的core来处理队列中的任务)// 如果队列为空则停止 int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } /** * Returns the core number of threads.*/ public int getCorePoolSize() { return corePoolSize; } /** * 启动核心线程,导致他们空闲等待工作 * 这个覆盖【仅在执行新任务时启动核心线程】的默认策略,如果所有线程都已经启动,该方法返回false * * @return {@code true} if a thread was started */ public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } /** * 于 prestartCoreThread 一样,只在corePoolSize为0时,至少启动一个线程 */ void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } /** * 启动所有核心线程,导致他们空闲等待任务,覆盖【执行新任务时创建核心线程】策略 * * @return the number of threads started */ public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } /** * Returns true if this pool allows core threads to time out and * terminate if no tasks arrive within the keepAlive time, being * replaced if needed when new tasks arrive. When true, the same * keep-alive policy applying to non-core threads applies also to * core threads. When false (the default), core threads are never * terminated due to lack of incoming tasks. * * @return {@code true} if core threads are allowed to time out, * else {@code false} * * @since 1.6 */ public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } /** * Sets the policy governing whether core threads may time out and * terminate if no tasks arrive within the keep-alive time, being * replaced if needed when new tasks arrive. When false, core * threads are never terminated due to lack of incoming * tasks. When true, the same keep-alive policy applying to * non-core threads applies also to core threads. To avoid * continual thread replacement, the keep-alive time must be * greater than zero when setting {@code true}. This method * should in general be called before the pool is actively used. * * @param value {@code true} if should time out, else {@code false} * @throws IllegalArgumentException if value is {@code true} * and the current keep-alive time is not greater than zero * * @since 1.6 */ public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } } /** * 设置允许的最大线程数,将覆盖构造器中设置的数值,如果新值小于当前值,多于线程会在下次空闲时终止 * * @param maximumPoolSize the new maximum * @throws IllegalArgumentException if the new maximum is * less than or equal to zero, or * less than the {@linkplain #getCorePoolSize core pool size} * @see #getMaximumPoolSize */ public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } /** * Returns the maximum allowed number of threads. * * @return the maximum allowed number of threads * @see #setMaximumPoolSize */ public int getMaximumPoolSize() { return maximumPoolSize; } /** * 设置线程的keep-alive时间,这是线程在终止前可能保持空闲的时间 * * * @param time the time to wait. A time value of zero will cause * excess threads to terminate immediately after executing tasks. * @param unit the time unit of the {@code time} argument * @throws IllegalArgumentException if {@code time} less than zero or * if {@code time} is zero and {@code allowsCoreThreadTimeOut} * @see #getKeepAliveTime(TimeUnit) */ public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } /** * 返回线程保持活动时间,这是线程在终止前可能保持空闲的时间。如果线程池中当前线程数超过core这这线程池allowCoreThreadTimeout那么等待这个时间而不处理任务的线程将被终止 * @param unit the desired time unit of the result * @return the time limit * @see #setKeepAliveTime(long, TimeUnit) */ public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } /* 用户层面队列公用 */ /** * 返回此执行器使用的queue,访问队列主要用户调试和监控 * * @return the task queue */ public BlockingQueue<Runnable> getQueue() { return workQueue; } /** * 如果任务在queue中,则移除任务,任务将不会运行 * present, thus causing it not to be run if it has not already * started. * * 这种方法作为取消方案的一部分,可能无法删除已经转换为其他形式的的任务 * 但是可以通过cleanup删除那些已经取消的future * * @param task the task to remove * @return {@code true} if the task was removed */ public boolean remove(Runnable task) { boolean removed = workQueue.remove(task);//从队列中移除任务 tryTerminate(); // 防止shutdown,现在为空 return removed; } /** * 尝试从队列中移除所有已经取消的任务,这个方法可以作为存储回收操作使用,对其他功能没有影响 * 被取消的任务永远不会执行,但可能会在queue中积累直到工作线程能够主动删除他们 * 调用这个方法将尝试主动删除他们,但是也可能在其他线程的影响下无法工作 */ public void purge() { final BlockingQueue<Runnable> q = workQueue; try { Iterator<Runnable> it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) //移除被取消的future it.remove(); } } catch (ConcurrentModificationException fallThrough) { // 遍历过程中受到烦扰则选择慢速路径 // 复制遍历,慢路径肯能为O(N*N) for (Object r : q.toArray()) if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) q.remove(r); } tryTerminate(); // 以防shutdown以及现在为空 } /* 统计数据 */ /** * 返回线程池中当前线程数 * * @return the number of threads */ public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 避免罕见的异常情况 // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } /** * 返回正在执行任务的线程数量 * * @return the number of threads */ public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } /** * 返回线程池中存在的最大线程数 * * @return the number of threads */ public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } /** * 返回曾经计划执行的任务大致总数。由于任务和线程在计算期间动态变化,返回的是近似值 * scheduled for execution. Because the states of tasks and * threads may change dynamically during computation, the returned * value is only an approximation. * * @return the number of tasks */ public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } /** * 返回已经完成的任务大致总数,因为任务和线程的状态在计算过程中动态变化,所以返回的值是一个近似值,但是连续调用不会减少 * @return the number of tasks */ public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } /** * 返回线程池的字符串表现:reunstate,woekerCount、任务技术 * * @return a string identifying this pool, as well as its state */ public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String runState = isRunning(c) ? "Running" : runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down"; return super.toString() + "[" + runState + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } /* 拓展钩子方法 */ /** * 方法在给定的线程(t)执行任务(r)前调用,可以用来重新初始化threadLocal或者日志记录 * * 现在是空实现,但是可以在子类中定制。注意:为了正确的嵌套多个覆盖,子类通常应该调用super.beforExecute
* @param t the thread that will run task {@code r} * @param r the task that will be executed */ protected void beforeExecute(Thread t, Runnable r) { } /** * 方法在完成给定的可运行任务执行后调用,吃方法由执行任务的线程调用,如果non-null,抛出为捕捉的RuntimeException或者Error可导致执行突然停止 * * 默认空实现,可以在子类中定制,为了正确嵌套多个覆盖,子类同行应该调用super.afterExecte * * 注意:当动作封装在任务(如:Future)或者通过方法(如:submit)时,这些任务对象捕获并且维护计算异常而不会导致突然终止,而且这个任务异常是不会传递给这个方法的 * 如果想要在这个方法中捕获这两类异常,可以进一步探测这种情况,例如: * * <pre> {@code * class ExtendedExecutor extends ThreadPoolExecutor { * // ... * protected void afterExecute(Runnable r, Throwable t) { * super.afterExecute(r, t); * if (t == null * && r instanceof Future<?> * && ((Future<?>)r).isDone()) { * try { * Object result = ((Future<?>) r).get(); * } catch (CancellationException ce) { * t = ce; * } catch (ExecutionException ee) { * t = ee.getCause(); * } catch (InterruptedException ie) { * // ignore/reset * Thread.currentThread().interrupt(); * } * } * if (t != null) * System.out.println(t); * } * }}</pre> * * @param r the runnable that has completed * @param t the exception that caused termination, or null if * execution completed normally */ protected void afterExecute(Runnable r, Throwable t) { } /** * 这方法在执行器终止时调用,默认空实现,子类可以定制,注意:为了确保正确的嵌套多个覆盖,应该调用super.termination * {@code super.terminated} within this method. */ protected void terminated() { } /* 预定义的拒绝处理器 RejectedExecutionHandlers */ /** * 直接在execute方法的调用线程中运行任务,除非处理器已经shutdown,这种情况任务被丢弃*/ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * 在调用者线程中执行r,除非线程已经关闭,这种情况丢弃任务 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * 决绝任务并且throws RejectedExecutionException * * 是 ThreadPoolExecutor} 和 ScheduledThreadPoolExecutor 的默认处理器 */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * 总是 throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } /** * 静默的丢弃任务 */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * 什么也不做,丢弃任务 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * 丢弃最老的未执行任务,然后尝试执行r,如果线程池已经关闭则将r丢弃*/ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * 获取并忽略执行器的下一个任务,然后重试执行任务r,如果执行器关闭,丢弃任务 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } }
原文地址:https://www.cnblogs.com/gsanye/p/11180771.html