java线程(6)——线程池(下)

上篇博客java线程(5)——线程池(上)介绍了线程池的基本知识,这篇博客我们介绍一下常用的ThreadPoolExecutor。


定义

类图关系:

这里写图片描述

ThreadPoolExecutor继承了AbstractExecutorService抽象类,而AbstractExecutorService实现了ExecutorService接口。

下面来看下ThreadPoolExecutor的代码:

public class ThreadPoolExecutor extends AbstractExecutorService {

   //核心线程池的大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
   private volatile int corePoolSize;

   //线程池中线程的最大数量
   private volatile int maximumPoolSize;

   //Timeout时间,没有任务执行时最多保持多久时间之后终止。
   private volatile long keepAliveTime;

    //线程工厂,所有的线程都是通过他创建的。
    private volatile ThreadFactory threadFactory;
......

    //给定初始化参数
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ......
}

Tips:
Volatile关键字,意为“不稳定的,易变的”。它经常用在多线程中的类型修饰符,他的作用是确保某条指令或代码不会被省略,而且每次都是直接读取值,而不是使用备份数据。

线程池状态

线程池的几种状态基本上是级别层层递进的,区分的因素有:

是否接受新任务,
是否允许运行队列中的任务,
是否继续执行正在处理的任务等等。

   //线程池能接受任务新任务,并且可以运行队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;

    //不再接受新任务,但可继续运行队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    //任务都不再执行,正在处理的任务要中断
    private static final int STOP       =  1 << COUNT_BITS;
    //终止
    private static final int TIDYING    =  2 << COUNT_BITS;
    //terminated()方法执行结束
    private static final int TERMINATED =  3 << COUNT_BITS;

状态之间的转化关系图如下:

这里写图片描述

Worker类

说到线程池,我们经常会把线程池比作是一个工厂,而把线程比作工人。

在ThreadPoolExecutor类中,还真就有一个Worker类,先看一下他的定义。

 private final class Worker  extends AbstractQueuedSynchronizer implements Runnable{
        //正在运行
        final Thread thread;
        //初始运行的任务,可能为null
        Runnable firstTask;
        //前一个线程任务
        volatile long completedTasks;


         Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //检查是否可以添加新的线程到当前的线程池中
         private boolean addWorker(Runnable firstTask, boolean core) {

        }
        //创建线程时失败回滚
        private void addWorkerFailed(Worker w) {
        }

        //移除任务
         public boolean remove(Runnable task) {
         }
     ......

}

分析:

Worker继承了AbstractQueuedSynchronizer,他是一个基于FIFO队列,也叫同步器。
Worker类实现了Runnable接口,说明他是一个线程类。此外,内部还提供了新增、移除任务等方法。那么什么时候可以新增什么时候又需要移除呢,就涉及到核心方法execute()了。

execute()

核心代码如下:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        //如果当前活动的线程数<核心池大小
        if (workerCountOf(c) < corePoolSize) {
            //创建线程处理任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果当前线程池状态为running,且成功加入指定队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //二次检查是否加入队列
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

分析:
上面的代码由三个大的if块组成,前两个比较容易理解,第三个有点绕。

1、当前活动线程<核心池大小,继续调用addWorker创建线程处理任务。
2、前面说到Running状态可以接受新任务,运行队列中的任务。判断当前线程的状态是否为Running,如果是,并且任务成功加入队列,还需要进行二次检查,防止出现shut down现象。
3、如果当前线程状态不是Running或任务加入队列失败,则跳转到else if中,执行reject()方法。

原文地址:https://www.cnblogs.com/saixing/p/6730218.html