多线程(3) — 线程池

  线程的创建和销毁是消耗时间和资源的,创建大量的线程会抢占宝贵的内存资源,大量的线程回收也给GC带来很大的压力。为了避免这种情况的出现,可以让创建的线程复用,那么线程池就应用而生了。需要线程时,从线程池中拿一个线程,不需要时再归还给线程池,从而达到了复用线程的效果。

  JDK提供了一套Executor框架,用于进行线程控制,本质是一个线程池。ThreadPoolExecutor表示一个线程池,Executors扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。主要有以下工厂方法:

    (1)public static ExecutorService newFixedThreadPool(int nThreads)

返回一个固定线程数量的线程池,该池中的线程数量不变。当有新的任务提交时,线程池中要是有 空线程,任务会立即执行。若没有,则新的任务会被暂存在一个队列里,直到有空闲的线程再处理。

    (2)public static ExecutorService newSingleThreadExecutor()

该方法返回一个只有一个线程的线程池。如果多于一个任务被提交到此线程池,任务会被保存在一个任务队列中,待线程空闲,按照先入先出的顺序执行队列中任务。

    (3)public static ExecutorService newCachedThreadPool()

返回一个可根据实际情况调整线程数量的线程池。也就是线程池中线程的数量不是确定的,单有空闲线程可以复用,会优先使用可复用的线程。如果所有的线程都在工作,这时又有新的线程提交任务,则会创建新的线程来处理任务,所有线程在当前任务执行完毕后返回线程池,再进行复用。

    (4)public static ScheduledExecutorService newSingleThreadScheduledExecutor()

返回一个ScheduledExecutorService 对象,线程池大小为1。ScheduledExecutorService 接口在ExecutorService 接口之上扩展了在给定时间执行某任务的功能。例如在某个固定时间延时执行或者周期性执行某个任务。

    (5)public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

返回ScheduledExecutorService 对象,但该线程可以指定线程数量大小

  以上核心线程池的内部实现,其实都是使用ThreadPoolExecutor类进行的封装。这个类很强大,能实现多种线程池,现在先看看这个类的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

  下面来说说这几个参数的含义:

    (1)corePoolSize:指定了线程池中的线程数量

    (2)maximumPoolSize:指定了线程池中最大线程的数量

    (3)keepAliveTime:当线程池中线程的数量超过corePoolSize时,多语的空闲线程的存活时间,即超过corePoolSize的空闲线程,在多长时间被销毁。

    (4)unit:keepAliveTime的单位

    (5)workQueue:任务队列,被提交的但还没有执行的任务

    (6)threadFactory:线程工厂,用于创建线程,一般用默认的就可以了

    (7)handler:拒绝策略,当任务太多,来不及处理时,如何拒绝任务。

  以上几个参数重点说说workQueue和handler

  workQueue:指被提交但未执行的任务队列,是BlockingQueue接口对象,仅用于存放Runnable对象,在ThreadPoolExecutor构造函数中可以是以下几种BlockingQueue接口

(1)直接提交的队列:SynchronizeQueue。这个队列没有容量,每个插入都要等待一个删除,提交的任务不会被真实保存,将任务直接交给新线程执行,如果没有空闲线程,会尝试创建新的进程,如果进程数量达到最大值,会执行拒绝策略,一般需要很大的maximumPoolSize。

(2)有界的任务队列:ArrayBlockingQueue。构造函数必须带一个容量参数,表示该队列的最大容量。当新任务需要执行时,如果线程池中实际线程数小于corePoolSize,则会优先创建新线程,大于corePoolSize的话,会把任务加入等待队列。如果队列已满,无法加入,则在总线程数不大于maximumPoolSize前提下,创建新的进程执行任务,若大于maximumPoolSize,则执行拒绝策略。执行任务的顺序是先进先出。

(3)无界的任务队列:LinkedBlockingQueue。除非系统资源耗尽,否则该队列不存在任务入队失败的情况。有新任务到来时,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,达到corePoolSize后线程就不再增加了。若后续有任务提交,又没有空闲线程,那么直接加入队列等待,如果任务的创建和处理的速度差异很大的话,该队列会迅速增长,直到耗尽系统内存。执行任务的顺序是先进先出。

(4)优先任务队列:PriorityBlockingQueue,带有优先级的任务队列,可以控制执行任务的先后顺序,是一个特殊的无界队列。根据任务自身携带的优先级顺序先后执行任务,确保系统性能同时可以保证质量。

  ThreadPoolExecutor 类核心调度代码如下,这段代码体现了线程池的工作逻辑。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  代码逻辑:workerCountOf()函数取得了当前线程池的线程总数,小于corePoolSize时,会将任务通过addWorker()方法直接调度执行,否则在workQueue.offer()进入等待队列。如果进入等待队列失败(比如有界队列达到上限了,或SynchronousQueue),会执行addWorker(command, false),如果当前线程数达到了maximumPoolSize,则提交失败,执行拒绝策略。

  拒绝策略handler:线程池中线程已全部在执行任务,任务队列也满了放不下新任务,这个时候就需要一套机制来处理这个问题,就是拒绝策略。JDK提供了四种拒绝策略:

    • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
    • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是任务提交线程的性能会急剧下降。
    • DiscardOldsetPolicy策略:该策略将丢弃最大的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
    • DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这应该是最好的方案了吧。

  以上策略均实现了RejectedExecutionHandler接口,若以上策略仍然无法满足实际应用的需要,可以自己扩展这个接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

  除了以上的任务队列和拒绝策略两个参数以外,还可以自定义线程创建,甚至可以扩展线程池。

  自定义线程创建:ThreadFactory,只有一个工厂方法。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

  当线程池需要线程时,就会调用这个方法。自定义线程可以跟踪线程池究竟何时创建了多少线程,也可以自定义线程的名称、组以及优先级等,甚至可以把线程设置为守护线程。

   扩展线程池:ThreadPoolExecutor是一个可以扩展的线程池,提供了beforeExecute()、afterExecute()、terminated()三个接口用来对线程池进行控制。

 下面来个自定义创建线程和扩展线程池的例子:

/**
 * 扩展线程池
 *
 */
public class ExtThreadPool {
    public static class MyTask implements Runnable{
        public String name;
        public MyTask(String name){
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println("正在执行 Thread ID:"+Thread.currentThread().getId()+",Task name = "+this.name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory(){
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("Create "+t);
                        return t;
                    }
        }){

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备好了,开始执行:"+((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成了:"+((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程退出");
            }
            
        };
        
        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("MyTask-"+i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

执行结果如下:

Create Thread[Thread-0,5,main]
准备好了,开始执行:MyTask-0
正在执行 Thread ID:11,Task name = MyTask-0
Create Thread[Thread-1,5,main]
准备好了,开始执行:MyTask-1
正在执行 Thread ID:12,Task name = MyTask-1
Create Thread[Thread-2,5,main]
准备好了,开始执行:MyTask-2
正在执行 Thread ID:13,Task name = MyTask-2
Create Thread[Thread-3,5,main]
准备好了,开始执行:MyTask-3
正在执行 Thread ID:14,Task name = MyTask-3
Create Thread[Thread-4,5,main]
准备好了,开始执行:MyTask-4
正在执行 Thread ID:15,Task name = MyTask-4

  在自定义的线程里ThreadFactory.newThread()方法把新增加的线程都设置为守护线程,这样主线程退出后,所有的线程就退出了,强制性的没有让线程执行完毕。

  beforeExecute()、afterExecute()、terminate()方法被重写,分别记录了一个任务开始、结束和整个线程池的退出。execute()方法提交执行任务。提交完成后,调用shutdown()方法来关闭线程池,这是个很安全的方法,如果当前还有线程在执行,shutdown()方法不会立即暴力终止所有任务,会等待所有任务执行完毕后关闭线程池,并且这个线程池不能再接受其他新的任务了。这三个重写的方法特别有助于应用程序的调试及问题的诊断。

原文地址:https://www.cnblogs.com/wangyongwen/p/11213002.html