Java线程池浅析

为什么使用线程池

线程使应用能够更加充分合理的协调利用CPU、内存、网络、IO等系统资源。

  • 线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。
  • 在线程销毁的时候需要回收这些系统资源。
  • 频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程风险。
  • 另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些都是线程自身无法解决的。

所以需要线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
线程池的作用包括:

  • 利用线程池管理并复用线程、控制最大并发数等。
  • 实现线程任务队列缓存策略和拒绝机制。
  • 实现某些和时间相关的功能,如定时执行和周期执行等。
  • 隔离线程环境。比如:交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务和搜素服务隔离开,避免各服务线程相互影响。

线程池构造函数

线程池是怎样创建线程的:
ThreadPoolExecutor构造函数

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数解析:

  • corePoolSize表示常驻的核心线程数。如果等于0,则任务执行完之后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会销毁。这个设置非常关键,设置过大会浪费资源,设置过小会导致频繁的创建或销毁。
  • maximumPoolSize表示线程池能够同时容纳同时执行的最大线程数。必须大于或者等于1。如果待执行的线程数大于此值,则会缓存到队列中。如果corePoolSize和maximumPoolSize相等,即为固定大小线程池。
  • keepAliveTime表示线程中的线程空闲时间,当空闲时间达到keepAliveTime值时,线程会被销毁,直到只剩下corePoolSize个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程数大于corePoolSize时,keepAliveTime才会起作用。但是当ThreadPoolExecutor的allowCoreThreadTimeOut变量被设置为true时,核心线程超时后也会被回收。
  • TimeUnit表示时间单位。keepAliveTime的时间单位通常是TimeUnit.SECONDS。
  • workQueue表示缓存队列。当请求的线程大于corePoolSize时,线程进入BlockingQueue阻塞队列。
  • threadFactory表示线程工厂。它用来生产一组相同的任务的线程。线程池的命名是通过个这个factory增加组名前缀来实现的。在虚拟机分析时就可以知道线程任务时那个线程工厂生产的。
  • handler表示执行拒绝策略的对象。当workQueue的任务缓存区到达上限之后,并且活动线程数大于maxPoolSize时,线程池通过拒绝策略处理请求,是一种简单的限流保护。友好的拒绝策略可以如下三种:
    1. 保存到数据库进行削峰填谷。在空闲时再提取出来执行。
    2. 转向某个提示页面
    3. 打印日志

Executors解析

从ThreadPoolExecutor构造方法来看,队列、线程池、拒绝处理服务都必须有实例对象。在平时我们经常会通过Executors这个线程池静态工厂创建线程池,它提供了线程池这三个的默认实现。
下面是线程池的相关类图:
在这里插入图片描述
ExecutorService接口继承了Executor接口,定义了管理线程任务的方法。ExecutorService的抽象类AbstractExecutorService提供了submit()、invokeAll()等部分方法的实现,但是核心方法Executor.execute()并未在此实现。因为所有的任务都在这个方法里执行,不同的实现会带来不同的执行策略。通过Executors的静态工厂方法可以创建三个线程池的包装对象:ThreadPoolExecutor、ForkJoinPool、ScheduledThreadPoolExecutor。Executors的和新方法有五个:

  • Executors.newWorkStealingPool:创建有足够线程的线程池支持给定的并行度,斌通过使用多个队列减少竞争,此构造方法中把CPU的数量设置为默认的并行度。
  public static ExecutorService newWorkStealingPool(int parallelism) {
      //返回ForkJoinPool对象
        return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
    }
  • Executors.newCachedThreadPool:maximumPoolSize最大可至Integer.MAX_VALUE,是高度可伸缩的线程池,如果达到这个上限,可定会跑出OOM异常。KeepAliveTime默认为60秒,工作线程处于空闲状态则回收工作线程。如果任务数增加,在此创建出新线程处理任务。
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
 }
  • Executors.newScheduledThreadPool:线程数最大可至Integer.MAX_VALUE,与上述相同,存在OOM风险。它是ScheduledExecutorService接口家族的实现类,支持定时任务和周期性任务执行。相比Timer,ScheduledExecutorService更安全,功能更强大,与newCachedThreadPool相比区别是不回收工作线程。
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
    }
  • Executors.newSingleThreadScheduledExecutor:创建一个单线程的线程池,相当于单线程串行执行所有任务,保证按照任务的提交顺序依次执行。
  • Executors.newFixedThreadPool:输入的参数即固定的线程数,既是核心线程也是最大线程数,
    不存在空闲线程,所以keepAliveTime等于0:
 public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }

缺陷:

  1. 首先来看一下LinkedBlockingQueue的构造方法:
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

这里capacity等于Integer.MAX_VALUE,使用这样的额无界队列,如果瞬间请求非常大,会有OOM风险。除了newWorkStealingPool,其他四个创建方式都存在资源耗尽的风险。

  1. 默认的线程工厂过于简单,线程工厂需要做创建前的准备工作,对线程池的创建必须明确标识,为线程本身指定有意义的名称和简单的序列号。
public class UserThreadFactory implements ThreadFactory {
    private final String namePrefix;
    private final AtomicInteger nextID = new AtomicInteger(1);
    public UserThreadFactory(String whatFeatureOfGroup) {namePrefix = "UserThreadFactory's  " + whatFeatureOfGroup + "-Worker-";}
    @Override
    public Thread newThread(Runnable r) {
        String  name=namePrefix+nextID.getAndIncrement();
        Thread thread = new Thread(null, r, name, 0);
        System.out.println(thread.getName());
        return thread;
    }
}
  1. 拒绝策略应该考虑到业务场景,返回相应的提示或者友好的跳转。
    在ThreadPoolExecutor中提供了四个公开的内部静态类:
    • AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
    • DiscardPolicy:丢弃任务,但是不抛出异常,不推荐此种做法。
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中。
    • CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行。

线程池源码解析

ThreadPoolExecutor的属性定义

在ThreadPoolExecutor的属性定义中频繁的使用位移运算来表示线程池状态,位移运算是改变当前值的一种高效手段。先来看以下ThreadPoolExecutor的属性定义:

    //Integer共有32位,最右边的29位表示工作线程数,最左边3位表示线程池状态
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //001-00000000000000000000000000000  
    //111-11111111111111111111111111111 (-1补码)
    //000-11111111111111111111111111111,类似于子网掩码,用于位的与运算
   private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 用左边3位,实现五种线程池状态
    
    //10000000000000000000000000000001原码(-1)
    //11111111111111111111111111111111补码,左移29位
    //111-00000000000000000000000000000   十进制-536,870,912
    //此状态表示线程池能够接受新任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //000-00000000000000000000000000000   十进制0
    //此状态不再接受新任务,但可以继续执行队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //001-00000000000000000000000000000   十进制536,870,912
    //此状态全面拒绝,并中断正在处理的任务
    private static final int STOP       =  1 << COUNT_BITS;
    //010-00000000000000000000000000000   十进制1,073,741,824
    //此状态表示所有任务已被终止
    private static final int TIDYING    =  2 << COUNT_BITS;
    //011-00000000000000000000000000000   十进制1,610,612,736
    //此状态表示已清理完现场
    private static final int TERMINATED =  3 << COUNT_BITS;


    //比如       001-00000000000000000000000000011,表示3个工作线程
    //掩码取反   111-00000000000000000000000000000,
    //与运算     001-00000000000000000000000000000 ,即得到左边3位001,表示线程池当前处于STOP状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //同理掩码000-11111111111111111111111111111与运算得到右边29位,即工作线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //把左边3位与右边29位按或运算,合并成一个值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态用高3位来表示,其中包括符号位。五种状态按照十进制从小到大依次排序为:
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
这样设计的好处是可以通过比较值的大小来确定线程池的状态。例如:

 private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

Executor接口的execute方法

线程池的主要处理流程,总结一下如下图:
在这里插入图片描述

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //返回包含线程数及线程池状态的Integer类型数值
        int c = ctl.get();
        //如果工作线程数小于核心线程数,则创建任务并执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            //如果创建失败,防止外部已经在线程池中加入新任务,重新获取一下
            c = ctl.get();
        }
        //只有线程池处于RUNNING状态,才执行后半句置入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池不是RUNNING状态,则将刚加入队列的任务移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
             //如果之前的线程都被消费完,新建一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //核心池和队列都已满,则尝试创建一个新线程
        else if (!addWorker(command, false))
             //如果创建失败,则唤醒拒绝策略
            reject(command);
    }
  • execute方法在不同的阶段有三次addWorker的尝试动作
  • 发生拒绝的理由有两个:1.线程池状态为非RUNNING。2.等待队列已满

addWorker源码分析

/**
 *根据当前线程池的状态,检查是否可以添加新的任务线程,如果可以则创建并启动任务
 *如果一切正常返回true。返回失败的可能性如下:
 *1.线程池没有处于RUNNING状态
 *2.线程工厂创建新的任务线程失败
 *
 *firstTask:外部启动线程池需要构造的第一个线程,它是线程的母体
 *core:新增工作线程时的判断指标:
 *   true表示新增工作线程时,需要判断当前RUNNING状态的线程是否少于corePoolSize
 *   false表示新增作线程时,需要判断当前RUNNING状态的线程是否少于maximumPoolSize
 */
 private boolean addWorker(Runnable firstTask, boolean core) {
       //不需要任务预定义的标签,相应下文的continue retry,快速推出多层嵌套循环
        retry:
        for (;;) {
           //返回包含线程数及线程池状态的数值
            int c = ctl.get();
            //获取当前线程工作状态
            int rs = runStateOf(c);

            //判断线程池状态是否不为RUNNING,如果为RUNNING状态则不执行后面的判断
            //如果为RUNNING则判断线程池状态不为SHUTDOWN,firstTask 不为null,workQueue为空,则返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
            //获取当前工作线程数
                int wc = workerCountOf(c);
                //如果工作线程数大于等于最大容量(2^29)
                //或者根据core值判断工作线程数是否大于等于corePoolSize(maximumPoolSize)返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //将当前活动线程数+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //线程池状态和线程数是可变化的,需要经常获取这个最新值
                c = ctl.get();  // Re-read ctl
                //如果线程池状态发生变化,需要再次从retry标签处进入,再做判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //开始创建工作线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        //通过Worker构造函数中的线程工厂this.thread = getThreadFactory().newThread(this);创建线程,并封装成Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            //在进行ThreadExecutorPool的敏感操作时都需要持有主锁,避免在添加和启动线程时受到干扰
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取当前线程池的运行状态
                    int rs = runStateOf(ctl.get());
                    //当前线程池为RUNNING
                    //或SHUTDOWN且firstTask为null时
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //整个线程池在运行期间的最大并发任务数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                //start Worker中的属性对象thread
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //线程启动失败,则将当前worker移除、工作计数线程再减回去
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Worker部分源码
 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
           //AQS方法,在runWorker方法执行之前禁止线程被中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //当thread被start之后,执行runWorker方法
        public void run() {
            runWorker(this);
        }
    }

使用线程池需要注意以下几点:

  • 合理设置各类参数,应根据实际业务场景来设置合理的工作线程数。
  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
  • 创建线程或线程池时指定有意义的线程名称,方便出错时回溯。
  • 不允许使用Executors创建线程池,通过ThreadPoolExecutor方式创建,能明确线程池的运行规则,避免资源耗尽的风险。
原文地址:https://www.cnblogs.com/demo-alen/p/13547219.html