线程池源码解析

ThreadPoolExecutor的几个重要属性

  • BlockingQueue workQueue

    阻塞队列。存放将要执行的任务

  • HashSet workers

    线程集合。下文会重点介绍Worker这个内部类

  • corePoolSize

    核心线程数

  • maximumPoolSize

    最大线程数

  • keepAliveTime

    非核心线程保持空闲的最长时间

  • allowCoreThreadTimeOut

    核心线程是否被回收。默认是不回收核心线程的

  • RejectedExecutionHandler defaultHandler = new AbortPolicy()

    默认拒绝策略。可以看到默认是抛异常

      public static class AbortPolicy implements RejectedExecutionHandler {
    
          public AbortPolicy() { }
    
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              throw new RejectedExecutionException("Task " + r.toString() +
                                                   " rejected from " +
                                                   e.toString());
          }
      }
    

源码分析

1.execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        
    int c = ctl.get();
    //当前线程数 < 核心线程数        
    if (workerCountOf(c) < corePoolSize) {
        //新建线程并执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(上一步addWorker失败了 || 当前线程数 >= 核心线程数) && 阻塞队列未满 && 线程池运行中
    if (isRunning(c) && workQueue.offer(command)) {
        //为了再次校验线程状态
        int recheck = ctl.get();
        //线程池不是运行中 && 将任务移除阻塞队列成功
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //所有线程都被回收了 但是之前workQueue已经接收了任务
        else if (workerCountOf(recheck) == 0)
            //这里为什么传null?
            addWorker(null, false);
    }
    
    // 阻塞队列满了
    // 当前线程数 < 最大线程数 新建线程会成功
    else if (!addWorker(command, false))
        //当前线程数 >= 最大线程数 执行拒绝策略
        reject(command);
}

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 很恶心的判断。就当线程池被搞了吧。正常情况下不会进来
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 根据core参数来判断能不能新建线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //改线程数+1。后续失败会对这个操作回滚
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //这之前的操作其实就是一些校验,相当于预创建线程
    //现在才开始真正的创建线程并执行
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将任务封装为Worker。new出来的时候内部就新建了一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获取全局锁 一个个来执行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //再进行一系列的校验
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //已经被执行了抛异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //记录下这个线程池整个生命周期里的最大的线程数(这东西没什么卵用)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 这里其实调的是Worker#runWorker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //擦屁股。因为之前的预创建在还没正真执行的时候就将工作线程数+1了,所以这里回滚。再从workers中移除
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker.runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    // 这个work执行过程中是否顺利完成。
    boolean completedAbruptly = true;
    try {
        //之前的伏笔 task = null的时候 走 task = getTask()
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果线程池状态大于等于STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法。由开发者重写扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //钩子方法。由开发者重写扩展
                    afterExecute(task, thrown);
                }
            } finally {
                //置空task,准备getTask获取下一个任务
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //task.run()抛异常了
        completedAbruptly = false;
    } finally {
        //将这个worker移除
        //运行状态小于STOP的情况下
        //allowCoreThreadTimeOut为false && 当前线程数小于核心线程数 新建一个worker
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

private Runnable getTask() {
        boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池被搞了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否要回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //keepAliveTime的作用
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

2.shutdown && shutdownNow

shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //必要的校验 没啥意思
        checkShutdownAccess();
        //里面不断重试,直到将线程池状态改成SHUTDOWN成功为止
        advanceRunState(SHUTDOWN);
        //最重要的方法,看下面解析
        interruptIdleWorkers();
        //原文注释是ScheduledThreadPoolExecutor会重写这个方法
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //看下面解析
    tryTerminate();
}

--

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //已经被中断了自然就不中断。
            //w.tryLock()什么时候成功?什么时候失败?
            //Worker.runWorker方法while条件成立的时候就会上锁。
            //所以当这个work正在执行任务时获取锁失败;
            //当这个work执行任务成功后解锁,在getTask的时候获取锁成功
            //即只会中断此刻空闲的线程
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

--

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 只有当线程池状态是STOP || (SHUTDOWN && 阻塞队列没有任务)的时候才继续往下走
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 正在工作的线程数不为0 尝试中断works中的第一个work    
        if (workerCountOf(c) != 0) { 
            
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //历经千辛万苦 先把线程池状态置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    //再把线程池状态置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //线程池的终止条件(Lock.Condition)
                    //如果之前调用了awaitTermination方法,此刻唤醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

shutdownNow

shutdown只是中断空闲线程。并尝试终止线程池(如果没有正在工作的线程)

而shutdownNow是强制中断所有线程,并把阻塞队列里的任务全都抛弃,但是会返回任务list。然后尝试终止(这时候基本就真的终止了)

总结

--

  • 当前线程数 < 核心线程数:直接新建一个线程并执行任务

  • 当前线程数 >= 核心线程数 && 阻塞队列未满:将任务放入到阻塞队列

  • 核心线程数 <= 当前线程数 < 最大线程数 && 阻塞队列已满:新建线程并执行任务

  • 当前线程数 >= 最大线程数 && 阻塞队列已满:执行拒绝策略

--

当阻塞队列已经接收了任务,但此时所有线程被回收了,此时的任务将如何处理?

else if (workerCountOf(recheck) == 0)
    //这里为什么传null?
    addWorker(null, false);

新建一个线程去阻塞队列里获取任务并执行。

--

任务执行时抛异常了怎么处理?

执行processWorkerExit方法。

  • 将这个work从works中移除。统计加上这个work完成的任务数。

  • 尝试终止线程池(一般不会终止)

  • 新建一个work

--

何时终止线程池?

  • 调用shutdown且所有线程都空闲且阻塞队列为空

  • 调用shutdownNow

--
shutdown和shutdownNow的区别

shutdown只是中断空闲线程。并尝试终止线程池(如果没有正在工作的线程)

而shutdownNow是强制中断所有线程,并把阻塞队列里的任务全都抛弃,但是会返回任务list。然后尝试终止(这时候基本就真的终止了)

扩展

fixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核心线程数=最大线程数。

阻塞队列默认的用LinkedBlockingQueue且容量是Integer.MAX_VALUE。

那么问题来了。只要阻塞队列不满,这个线程池就一直会接收任务。到达一定数量,还未到Integer.MAX_VALUE的时候机器肯定爆了。显然我们实际项目中不应该直接用fixedThreadPool

cachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大线程数是Integer.MAX_VALUE。只要有任务过来,不断的新建线程。

--

这两种默认的线程池说白了就是无限接受任务。所以我们实际项目中应该自己构造线程池来解决实际需求。

原文地址:https://www.cnblogs.com/chenshengyue/p/11558648.html