线程池ThreadPoolExecutor

 

一 概述

1.线程池产生背景

在多线程环境下,频繁地创建与销毁线程会耗费大量的系统资源,降低运行性能,因此产生了一种设计思想:将创建好的线程放到一个容器中,需要时从容器取得线程,使用完毕将线程归还容器,这样就可以重复利用线程,避免了重复创建与销毁造成的资源消耗,提高了性能。

2.什么是线程池?

元素:线程。
本质:容器。
设计目的:使得线程能够被重复使用,降低系统消耗。
一个以多个线程为元素、通过重复使用降低系统消耗的容器。

二 ThreadPoolExecutor

该类是线程池思想的核心类,继承关系如图。

一个代表性的构造方法;

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

1.coolPoolSize

核心池大小。这个数字是根据一般情况下并发访问的线程数目设定的。默认情况下,线程池创建完毕后,池中并没有线程,有任务到达以后才开始创建线程。

2.maximumPoolSize

线程池中允许出现的最大线程数。维护线程需要消耗系统资源,最大线程数目的设定既考虑高峰时期最大并发访问数量,也兼顾了系统性能。

3.keepAliveTime

这个参数针对的是超出核心池大小的多余线程。设定一个时间,如果多余线程空闲时间超过该值,线程会被终止。默认情况下,只有当线程数目大于核心池大小时才有效,如果allowCoreThreadTimeOut设定为true,当线程数目小于核心池大小时也有效。

4.unit

keepAliveTime的单位。

5.workQueue

一个阻塞队列,用来存储等待执行的任务。BlockingQueue有几个常用的实现类:

  • ArrayBlockingQueue:不常用。
  • LinkedBlockingQueue:常用。
  • SynchronousQueue:常用。

6.threadFactory

线程工厂,用来创建线程。

7.handler

当线程数目到达最大值时,拒绝处理任务时采取的策略。

三 执行过程

1.当线程数目小于核心数目时,每来一个任务就创建一个线程去执行该任务。
2.当线程数目大于等于核心数目时,首先尝试将新到达的任务添加到任务缓存队列中,若添加成功,等待空闲线程执行该任务,添加失败尝试创建新的线程处理该任务。
3.如果线程达到最大数目,新任务达到时会采取拒绝处理策略。

execute()是ThreadPoolExecutor的核心方法,下面对该源码进行分析:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
       //workerCountOf(c)用于获取线程池中当前线程数目
        if (workerCountOf(c) < corePoolSize) {
            //addWorker(command,true)创建新的线程执行该任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //当线程数目大于等于corePoolSize时,首先尝试将任务添加到任务缓存队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果任务缓存队列已满,尝试创建新的线程
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker(Runnable firstTask, boolean core)源码:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
           ........................................................................

            for (;;) {
                int wc = workerCountOf(c);
               //当core=true时,如果线程数目小于corePoolSize,程序继续执行,创建新线程;当core=false时,如果线程数目大于等于
               //maximumPoolSize,程序终止执行,返回false,不会创建新的线程。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //为任务创建一个新的线程
            w = new Worker(firstTask);
            ..............................................
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

四 使用

通常不直接使用ThreadPoolExecutor的构造方法来创建对象,而是使用Executors的静态方法来创建对象。

newCachedThreadPool();

corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,使用SynchronousQueue作为任务缓存队列,如果线程池中没有可用的线程,每来一个任务创建一个线程。

newFixedThreadPool(int nThreads);

corePoolSize=nThreads,maximumPoolSize=nThreads,keepAliveTime=0s,使用LinkedBlockingQueue作为任务缓存队列。
线程池执行对象创创建完毕以后,将任务通过execute(Runnable command)方法提交给线程池执行对象即可。

参考:

http://www.cnblogs.com/dolphin0520/p/3932921.html

原文地址:https://www.cnblogs.com/tonghun/p/7086342.html