并发编程01-Executor框架分析

Executor框架

     在实际开发工作中,或多或少我们都用到过多线程。一般很少使用 new Thread(){run(){}}.start()这种创建线程。一方面影响代码规范性,另外一方面,也是不方便多线程管理。当大量任务时,线程的创建销毁都会占用很多计算资源。这也是为什么使用线程池的原因。

  • 在日常工作中,我们一般使用线程池进行管理线程。

  • 后面我们接触到的Fork/Join的线程管理ForkJoinPool也是基于Executor进行实现的。

综合以上,在学习更多多线程相关之前,有必要了解Executor框架的相关知识;

框架的使用示意图

  • 主线程通过创建线程任务,在Executor框架容器执行,根据需要返回执行结果。

Executor框架构成

  • 任务:要执行的任务。线程任务需要实现:Runnable或Callable接口。(这一块的内容已经了解过了,不需要深入分析)

  • 任务执行:主要是Executor的继承几口ExecutorService的实现类完成(主要需要分析的内容)

  • 异步实现:FutureTask类(前面了解过了FutureTask类的相关知识,不需要再深入)

源码分析:

     任务执行相关已经在之前了解过,不做更多深入。主要源码分析的是任务执行的抽象类,和异步实现类。

ExecutorService的实现类(AbstractExecutorService)

submit方法

  • 方法提交一个线程任务,并返回一个任务的执行结果Future;
/**
 * 提交任务
 */  
  public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);//生成一个新的任务
        execute(ftask);//执行新的任务,这个由子类实现
        return ftask;//返回执行结果
    }

 //创建一个新的任务
 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

子类需要实现的方法

  • shutdown 方法:在此之前的任务都会执行,但是不会接收新的任务;

  • shutdownNow 方法:暂停所有激活的方法,并返回任务列表;

  • awaitTermination方法:出现终端,暂停,或者限时。阻塞等待现有任务完成

  • execute方法,任务的执行过程;

ThreadPoolExecutor

线程池执行过程:

  • 接收到新的线程,如果小于核心线程数,则创建核心线程

  • 当超过核心线程数,多的线程任务存放到队列中

  • 只有当队列中的存满后,再判断是否小于最大线程数,如果小于最大线程数才创建新的线程

  • 当线程任务超过最大线程数,则根据策略进行抛出异常,或者任务舍弃等操作

注意点:

  • 核心线程,初始是在有新任务的时候创建;如果初始化有非空队列,核心线程需要提前启动,调用prestartAllCoreThreads方法;

  • ThreadFactory用于创建新的线程,也可以自定义设置线程名称等;

  • 超过keep-alive的非核心线程会被回收;

源码分析

任务管理

1.状态管理

  • 1.状态间的转换
     RUNNING -> SHUTDOWN //调用ShutDown方法后

     (RUNNING or SHUTDOWN) -> STOP//调用ShuntDownNow方法

     SHUTDOWN -> TIDYING //没有线程任务并且队列为空的时候

     STOP -> TIDYING //线程池空了之后

     TIDYING -> TERMINATED //当钩子方法(execute)结束后更改状态
  • 2.状态对应的数据
/**
 * 初始化用到的一些静态变量
 */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程池状态
    private static final int COUNT_BITS = Integer.SIZE - 3;//29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//00001111 11111111 111111111 11111111
    private static final int RUNNING    = -1 << COUNT_BITS;//11100000 00000000 00000000 00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//00000000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS;//00100000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS;//01000000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS;//01100000 00000000 00000000 00000000
    private static int runStateOf(int c)     { return c & ~CAPACITY; }//获取高位三位,获取线程状态
    private static int workerCountOf(int c)  { return c & CAPACITY; }//获取后29位,获取线程个数
    private static int ctlOf(int rs, int wc) { return rs | wc; }//进行或运算
      
  • 3.状态含义
RUNNING: 接受新任务并运行队列中的任务

SHUTDOWN: 不接受新的任务,但是能运行队列中的任务

STOP:  不接受新的任务,不运行队列中的任务 ,并且中断正在运行中的任务 

TIDYING: 所有任务都完成了,存活的线程为0, 当前线程池活动线程为0,将要调用terminated方法

TERMINATED: 终止状态。terminated方法调用完成以后的状态

2.存储线程任务结构

  • 重点看到的参数是workers,线程池存储线程是通过Set集合存储所有线程节点。所以下面我们要重点了解Worker类
//存储任务的队列
private final BlockingQueue<Runnable> workQueue;

//用于shutdown或者shuntdownNow关闭线程池的锁
 private final ReentrantLock mainLock = new ReentrantLock();

//存放线程池中所有的线程节点
private final HashSet<Worker> workers = new HashSet<Worker>();

//支持条件队列
private final Condition termination = mainLock.newCondition();

//线程池最大容量
private int largestPoolSize;

//统计执行的任务数,当线程回收时才进行统计 
private long completedTaskCount;

//线程创建工厂
private volatile ThreadFactory threadFactory;

//是否允许核心线程过时
private volatile boolean allowCoreThreadTimeOut;

//默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

Worker内部子类

Worker类继承AbstractQueuedSynchroner并实现Runnable接口

  • 内部变量Thread保存,根据ThreadFactory创建的当前线程;

  • firstTask保存的是要执行的线程任务

  • run方法提供了节点任务执行的方法

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask; 
        volatile long completedTasks;
        //构造方法    
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;//保存任务
            this.thread = getThreadFactory().newThread(this);//当前线程
        }

     /**
      * 获取锁,将状态0改成1
      */  
     protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

      protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }     
       /**
        * 执行当前节点的任务,后面会进行分析
        */ 
      public void run() {
            runWorker(this);
        }

}

execute方法

主要是三个步骤:

  • 1.少于核心线程数时,增加新的线程,如果增加新的线程节点,并执行线程任务(注意:初始,小于核心线程是没新增一个线程任务,才新增一个线程节点,并不是线程启动的时候一次性创建完毕

    • runWorker方法两种情况,首先执行当前线程的任务task,执行完成后,循环遍历queue获取头节点任务;队列任务执行完后跳出队列;

    • 判断线程池状态是否为STOP状态,跳出循环;

    • 如果不为STOP状态,则执行任务,记录当前线程节点执行任务数量。执行完毕,遍历queue反复

    • 跳出循环后,线程池workers线程集合,移除当前线程节点,统计线程池执行的总任务数,判断现有线程数量是否小于核心线程数,再增加新的空任务线程节点。(注意:所有线程节点执行完后,先移除,在判断是否需要生成新的节点

  • 2.如果大于核心线程数,再尝试增加到队列,如果增加到队列中成功。

    • 2.1检查线程池是否正常运行,如果异常则移除队列中的线程任务,并执行拒绝策略

    • 2.2检查线程池线程数量,线程池现有线程数量为0,则新建线程任务

  • 3.判断线程数是否超过了最大的线程数,如果超过了,则直接执行拒绝策略

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//获取线程池状态
        if (workerCountOf(c) < corePoolSize) {//判断运行中的线程数量是否少于核心数量,创建新的线程数
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//增加到队列中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态,则移除当前线程任务
                reject(command);//执行拒绝策略
            else if (workerCountOf(recheck) == 0)//现有线程数量为0时,增加新的核心线程
                addWorker(null, false);//增加一个空闲线程
        }
        else if (!addWorker(command, false))//判断增加的线程数是否超过了最大的线程数量,如果超过了,则执行拒绝策略
            reject(command);
    }

/**
 * 作用:判断现有线程worker超过了核心线程数(或最大线程数),超过了返回false,否则增加新的线程worker
 */ 
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//获取当前状态
            int rs = runStateOf(c);//获取状态高位3位

            /**
             * 1. 当前线程池状态为STOP,TIDYING,TERMINATED
             * 2. 当前线程池状态为SHUTDOWN并且已经有了第一个任务     
             * 3. 当前线程池状态为SHUTDOWN并且任务队列为空
             * 也就是说,在加入任务之前调用了SHUTDOWN方法,或者SHUTDOWNNOW方法后就不能再加入节点。
             */ 
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            /**
             * 判断现有线程数是否超过最大线程数,如果超过最大线程数返回false
             */ 
            for (;;) {
                int wc = workerCountOf(c);//获取当前线程数
                //现有线程数超过最大的数量返回false  
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//线程个数增加1
                    break retry;
                c = ctl.get();  // 重新获取执行数量

                // 查看线程池状态是否变更,如果改变则重试  
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
       /**
        * 1.获取独占锁;
        * 2.给线程池增加新的线程任务;
        */    
        try {
            w = new Worker(firstTask);//新键任务节点
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 增加线程任务,创建新的线程
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//这里会调用Work的run方法,下面我会具体分析
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker类的run方法(后续抽时间补充执行流程图)

主要作用是运行线程任务,具体步骤如下:

  • 线程节点有任务,或队列中有任务(获取头节点任务)

    • 返回空的几种情况:

      • 1.现有线程数超过最大线程数时;
      • 2.线程池时暂停状态
      • 3.线程池时关闭状态并且队列为空
      • 4.线程等待任务超时时
  • 如果当前线程节点有线程任务,或者循环返回的队列头节点有任务继续执行任务,如果没有可执行任务,则调用processWorkerExit方法,统计线程执行的任务,并清除线程池中空闲的线程;

  • 获取线程锁,并判断是否STOP状态,如果是STOP状态,说明调用了ShutdownNow方法,中断当前线程;调用processWorkerExit方法,进行任务统计及线程回收。

  • 执行线程中task的线程任务。并记录当前线程任务执行数量增加1,释放锁。

  • 最后清除线程中空闲的线程,并汇总线程任务执行总数

/**
 * 作用:运行节点的任务;
 * 1.获取到任务后,释放锁,成为可中断的状态;
 * 2.获取到任务,或者获取队列中第一个任务不为空时;
 * 3.判断线程池状态,如果正常则执行运行节点任务;
 */
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//
        w.firstTask = null;
        w.unlock(); //先释放锁,允许任务在执行之前被中断(如果调用ShutDown方法,会有锁竞争) 
        boolean completedAbruptly = true;
        try {
            /**
             * 条件:当前节点的任务不为空,或者队列中的任务不为空;
             * 如果任务为空,则获取头部节点的任务 
             */     
            while (task != null || (task = getTask()) != null) {
                w.lock();//获取锁,AQS中同步队列获取执行权
                /**
                 *  如果线程池停止了,查看当前线程是否中断,
                 *  如果中断了,则清除当前线程中端状态。并中断当前线程中任务执行
                 *  (这里时因为ShuntDownNow方法要立即停止所有线程任务,但是只用ShutDown的话,当前线程的任务还是会继续执行。)
                 */     
                
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//这里是抽象方法,留出扩展的空间,如果抛出异常直接进行捕获
                    Throwable thrown = null;
                    try {
                        task.run();//执行线程的任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        /**
                         * 如果线程任务在执行过程中出现错误,在这里进行处理。
                         * 这里提供的是抽象方法,继承类可以进行修改,用于处理异常情况,线程任务处理
                         * 这里出现异常也可能会造成线程中断     
                         */
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//执行完成,进行释放
                    w.completedTasks++;//完成任务++
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            /**
             * 作用:1.统计任务执行数量;2.多余线程节点回收
             */
            processWorkerExit(w, completedAbruptly);
        }
    }

   /** 有以下几种情况会返回null;
     * 1.现有线程数超过最大线程数时;
     * 2.线程池时暂停状态
     * 3.线程池时关闭状态并且队列为空
     * 4.线程等待任务超时时     
     * 主要作用: 除了以上非null的情况之外,从队列的头部获取到线程任务;
     */        
    private Runnable getTask() {
        boolean timedOut = false; 
        for (;;) {//自旋
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查队列是不是为空,为空则返回null;
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //线程池现有线程数
            int wc = workerCountOf(c);

            // 判断线程池现有线程数是否大于核心线程数,并且核心线程数是可过时的
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /**
             * 满足以下两个条件
             * 1.现有线程数大于最大线程数
             * 2.或现有线程数大于1且超时时
             * 减少线程数
             */      
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 获取头节点,如果头节点不为空,则返回线程任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
/**
 * 1.记录完成的任务数,并清除完成任务的线程
 * 2.判断是否要终止线程池;
 * 3.节点回收
 */
  private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //如果是中断状态
            decrementWorkerCount();//线程数量减1

        final ReentrantLock mainLock = this.mainLock;//获取线程池主锁
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//记录完成的任务数
            /**
             * workers中移除线程执行节点
             * 不管是否小于核心线程数,线程节点都要先移除     
             */
            workers.remove(w);
        } finally {
            mainLock.unlock();//释放线程池主锁
        }

        tryTerminate();//判断是否需要更改线程池状态

        int c = ctl.get();
        /**
         * 满足条件再增加一个空的线程节点; 
         * 1. 如果当前时SHUNTDOWM状态。当任务完成正常执行完毕后
         * 2. 判断核心线程是否需要回收,并且核心线程数量是否达标
         * 3. 如果核心线程数量满足则返回,否则,增加一个空任务的线程节点
         * 这里也是节点循环执行任务的根本所在
         */             
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);//增加一个节点
        }
    }

 /**
  * 作用:清除空闲的线程节点,判断是否将线程池的状态变更成TERMINATED状态
  */
 final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 运行状态,或者中断状态但是有任务要执行 
             * 还有任务执行时返回
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // 清除空闲的线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();//终止操作由子类进行操作
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));//将终止状态变TERMINATED状态
                        termination.signalAll();//唤醒条件队列
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

shutDown方法

  • 核心点就是,逐个中断节点时,如果节点还没中断,则先让该节点获取独占锁,然后再中断
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检测是否有关闭线程池的权限
            advanceRunState(SHUTDOWN);//变更线程池状态
            /**
             * 逐个中断线程节点
             *(注意这里判断,线程节点如果没有中断,先让节点的独占锁,保证线程任务执行完之后才进行中断操作)     
             */
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor(钩子方法,等待实现类实现)
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试变更线程池状态为TERMIATED,这个方法上面已经分析过了
    }

//中断线程节点
 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//如果线程没有中断,先获取锁之后才中断,也即是等任务执行完毕后才中断
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

shutDownNow 方法

  • 中断所有线程节点,不考虑节点有任务与否;

  • 返回队列中剩余的任务数;

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查权限
            advanceRunState(STOP);//更改线程任务为STOP状态
            interruptWorkers();//直接中断线程
            tasks = drainQueue();//获取队列中剩余的任务,并清空队列
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试变更线程池状态
        return tasks;
    }

 private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();//这里进行的操作是,只要没中断就进行中断,不会有获取锁的操作
        } finally {
            mainLock.unlock();
        }
    }

几种拒绝策略

  • AbortPolicy 总是抛出异常(这种是默认的)
 public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • CallerRunsPolicy 只要线程池没关闭就执行线程任务
  public static class CallerRunsPolicy implements RejectedExecutionHandler {
        
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • DiscardPolicy 静默处理,也就是啥都不干
  public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

  • DiscardOldestPolicy 把队列中的第一个任务给抛掉,然后执行当前的这个任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

ThreadPoolExecutor方法扩展

  • 上面源码分析中,我们也看到了ThreadPoolExecutor提供的几个钩子方法,供子类去扩展。具体如下面这个类:

  • 一般我们对线程firstTask任务执行失败后的处理也可以用提供的钩子方法进行相关处理。

 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();
 
    public PausableThreadPoolExecutor(...) { super(...); }
 
    protected void beforeExecute(Thread t, Runnable r) {//在线程任务执行之前进行的操作;
      super.beforeExecute(t, r);
      pauseLock.lock();
      try {
        while (isPaused) unpaused.await();//加入到条件队列
      } catch (InterruptedException ie) {
        t.interrupt();
      } finally {
        pauseLock.unlock();
      }
    }
 
    public void pause() {//
      pauseLock.lock();
      try {
        isPaused = true;
      } finally {
        pauseLock.unlock();
      }
    }
 
    public void resume() {
      pauseLock.lock();
      try {
        isPaused = false;
        unpaused.signalAll();//全部唤醒
      } finally {
        pauseLock.unlock();
      }
    }
  }}

总结归纳

1.ThreadPoolExecutor中线程如何管理的?超过核心线程的线程是如何被调用的?

线程管理的主要是通过两个参数管理的,
一个是ctl的原子变量,标识的是现存线程的个数;
另一个参数是workers,HashSet进行存储的,具体的线程worker实例;

从前面代码分析可以看出:

1.未超过核心线程数,会创建一个新的worker,存储到workers集合中,并且执行worker中firstTask的任务;

2.超过线程池的任务被放入到queue;

3.worker执行完当前任务和队列中任务后,当队列中没有可执行任务,直接会把当前的worker移除,并统计当前worker执行的任务;

4.如果workers中的线程节点数少于核心线程数,那么会调用增加addWork的方法,新增一个worker,firstTask为空去执行queue中的任务;

5.循环3,4步骤,直到线程池调用shutdown方法或shutdownNow方法;

从上面也可以看出,执行完任务的worker节点在队列没有任务的时候都会从workers数组中remove掉,然后新增空任务worker去处理queue中的新任务。

原文地址:https://www.cnblogs.com/perferect/p/13665664.html