[改善Java代码]适时选择不同的线程池来实现

Java的线程池实现从最根本上来说只有两个:ThreadPoolExecutor类和ScheduledThreadPoolExecutor类,这两个类还是父子关系,但是Java为了简化并行计算,还提供了一个Executors的静态类,它可以直接生成多种不同的线程池执行器,比如单线程执行器,带缓冲功能的执行器等.但归根结底还是使ThreadPoolExecutor类或ScheduledThreadPoolExecutor类的封装类.

 为了了解这些个执行器,看ThreadPoolExecutor类,其中它复杂的构造函数可以很好的解释该线程池的作用:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

上面第二个是ThreadPoolExecutor最完整的构造函数,其他的构造函数都是引用该构造函数实现的.

1.corePoolSize:最小线程数

线程池启动后,池中保持线程的最小数量,需要说明的是线程数量是逐步到达corePoolSize值的,例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多启动5个线程,而不是一次性启动10个线程.

2.maximumPoolSize:最大线程数量

这是池中能够荣达的最大线程数,如果超出,则使用RejectedExecutionHandler拒绝策略处理.

3.keepAliveTime:线程最大生命期

这个生命周期有两个约束条件:一是该参数针对的是超过corePoolSize数量的线程,二是处于非运行状态的线程.

如果corePoolSize为10,maximumPoolSize为20,此时线程池中有15个线程在运行,一段时间后,其中有3个线程处于等待状态的时间超过了keepAliveTime指定的时间,则结束这3个线程,

此时线程中则还有12个线程正在运行.

4.unit:时间单位

这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等选项

5.workQueue:任务队列

当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要有一个容器来容纳这些任务,这就是任务队列.

6.threadFactory:线程工厂

定义如何启动一个线程,可以设置线程名称,并且可以确认是否是后台线程等.

7.handler:拒绝任务处理器

由于超出线程数量和任务队列容量而对继续增加的任务进行处理的程序.

Executors提供的几个创建线程池的便捷方法:

1.newSingleThreadExecutors:单线程池

顾名思义就是一个池中就只有一个线程,该线程用不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理.

 1     /**
 2      * Creates an Executor that uses a single worker thread operating
 3      * off an unbounded queue. (Note however that if this single
 4      * thread terminates due to a failure during execution prior to
 5      * shutdown, a new one will take its place if needed to execute
 6      * subsequent tasks.)  Tasks are guaranteed to execute
 7      * sequentially, and no more than one task will be active at any
 8      * given time. Unlike the otherwise equivalent
 9      * <tt>newFixedThreadPool(1)</tt> the returned executor is
10      * guaranteed not to be reconfigurable to use additional threads.
11      *
12      * @return the newly created single-threaded Executor
13      */
14     public static ExecutorService newSingleThreadExecutor() {
15         return new FinalizableDelegatedExecutorService
16             (new ThreadPoolExecutor(1, 1,
17                                     0L, TimeUnit.MILLISECONDS,
18                                     new LinkedBlockingQueue<Runnable>()));
19     }

改方法的使用代码如下:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Client {
    public static void main(String[] args) throws Exception {
        //创建单线程执行器
        ExecutorService es = Executors.newSingleThreadExecutor();
        //执行一个任务
        Future<String> future = es.submit(new Callable<String>() {
            public String call() throws Exception {
                return "";
            }
        });
        //获得任务执行后的返回值
        System.out.println("返回值:" + future.get());
        //关闭执行器
        es.shutdown();
    }
}

2.newCachedThreadPool:缓冲功能的线程池

建立了一个线程池,该线程池的数量是没有限制的(不能超过Integer的最大值),新增一个任务就有一个线程处理,或者复用之前的空闲线程,或者启动一个新的线程.但是一旦一个线程在60秒内一直是处于等待状态时,也就是1分钟没有事情可做,就会被终止,源代码:

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to <tt>execute</tt> will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

这里需要说明的是,任务队列使用了同步阻塞队列,这意味着向队列中加入一个元素,即可唤醒一个线程(新创建的线程或复用池中空闲线程)来处理.这种队列已经没有队列深度的概念了.

3.newFixedThreadPool:固定线程数量的线程池

在初始化时已经决定了线程的最大数量,若任务添加的能力超出线程处理能力,则建立阻塞队列容纳多余的任务,源代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

上面返回是一个ThreadPoolExector,它的corePoolSize和maximumPoolSize是相等的,也就是说最大线程数是nThreads.

如果任务增长非常快,超过了LinkedBlockingQueue的最大容量(Integer最大值),那此时会如何处理呢?

会按照ThreadPoolExecutor默认的拒绝策略(默认是DiscardPolicy,直接丢弃)来处理.

以上三种线程池执行器都是ThreadPoolExecutor的简化版,目的是帮助开发人员屏蔽过多的线程细节,简化多线程开发.

可以这样比喻:newSingleTheadExecutor,newCachedThreadPool,newFixedThreadPool是线程池的简化版,而ThreadPoolExecutor是旗舰版.

原文地址:https://www.cnblogs.com/DreamDrive/p/5624024.html