java线程池技术(二): 核心ThreadPoolExecutor介绍

版权声明:本文出自汪磊的博客,转载请务必注明出处。

Java线程池技术属于比较“古老”而又比较基础的技术了,本篇博客主要作用是个人技术梳理,没什么新玩意。

一、Java线程池技术的由来

我们平时使用线程来进行异步操作时,线程的创建,销毁等相对来说都是比较消耗资源的,试想这样一个业务情景:高并发请求,但是每次请求的时间非常短。如果我们为每一个请求都单独创建一个线程来执行,就会消耗大量设备资源,使设备处于高负荷状态,显然这样的处理就有很大问题了。这时候我们就可以用线程池技术来解决了,我们在线程池中创建若干条线程,当有任务需要执行时就从该线程池中获取一个线程来执行任务,如果一时间任务过多,超出线程池的线程数量,那么后面的线程任务就进入一个等待队列进行等待,直到线程池有线程处于空闲时才从等待队列获取要执行的任务进行处理,这样就减少了线程创建和销毁的开销,实现了线程的复用。

二、Executor框架介绍

首先对整体框架有个大概了解,如图:

Executor是个接口,就定义了一个void execute(Runnable command)方法。

ExecutorService同样是一个接口并且继承自Executor接口,对其方法进行扩展,其中最重要的是<T> Future<T> submit(Callable<T> task)方法,关于CallableFutureFutureTask不是本篇重点,有时间会单独写一篇博客介绍。也可以自行搜索了解,比较简单。

AbstractExecutorService抽象类,实现了ExecutorService接口,主要实现了submit,ivokeAny方法。

ScheduledExecutorService同样是一个接口,继承自ExecutorService接口,对其进行扩展,主要就是schedule等方法。

ThreadPoolExecutor具体线程池实现类,继承自AbstractExecutorService抽象类,我们使用的时候大部分就是使用这个类,后面会具体讲到。

ScheduledThreadPoolExecutor 具有调度能力的线程池实现类,继承自ThreadPoolExecutor类,且实现ScheduledExecutorService接口,其主要功能就是调用schedule方法,可以延时或者周期的执行某一任务,而ThreadPoolExecutor是没有这一共能的。

三、ThreadPoolExecutor构造函数参数介绍

在我们使用ThreadPoolExecutor的时候会发现构造函数中有很多参数,如下:

 1 public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue) {
 6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 7              Executors.defaultThreadFactory(), defaultHandler);
 8     }
 9 
10 public ThreadPoolExecutor(int corePoolSize,
11                               int maximumPoolSize,
12                               long keepAliveTime,
13                               TimeUnit unit,
14                               BlockingQueue<Runnable> workQueue,
15                               ThreadFactory threadFactory) {
16         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
17              threadFactory, defaultHandler);
18     }
19 
20 public ThreadPoolExecutor(int corePoolSize,
21                               int maximumPoolSize,
22                               long keepAliveTime,
23                               TimeUnit unit,
24                               BlockingQueue<Runnable> workQueue,
25                               RejectedExecutionHandler handler) {
26         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
27              Executors.defaultThreadFactory(), handler);
28     }
29 
30 
31 public ThreadPoolExecutor(int corePoolSize,
32                               int maximumPoolSize,
33                               long keepAliveTime,
34                               TimeUnit unit,
35                               BlockingQueue<Runnable> workQueue,
36                               ThreadFactory threadFactory,
37                               RejectedExecutionHandler handler) {
38         if (corePoolSize < 0 ||
39             maximumPoolSize <= 0 ||
40             maximumPoolSize < corePoolSize ||
41             keepAliveTime < 0)
42             throw new IllegalArgumentException();
43         if (workQueue == null || threadFactory == null || handler == null)
44             throw new NullPointerException();
45         this.corePoolSize = corePoolSize;
46         this.maximumPoolSize = maximumPoolSize;
47         this.workQueue = workQueue;
48         this.keepAliveTime = unit.toNanos(keepAliveTime);
49         this.threadFactory = threadFactory;
50         this.handler = handler;
51     }

前3个构造函数都是调用第4个构造函数,只不过有些参数使用默认的罢了。

接下来我们看下第4个构造函数每个参数意义:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,RejectedExecutionHandler handler)

参数 说明
corePoolSize 线程池中核心线程数量
maximumPoolSize 线程池中最大线程数量
keepAliveTime 非核心线程存活时间
unit keepAliveTime的时间单位
workQueue 存放任务的队列
threadFactory 用来生产线程的工厂
handler 当线程池中不能再放入任务时执行的handler

如果有一个corePoolSize为5,maximumPoolSize为10的线程池,可用下图形象展示:

这里要说明一下:所谓核心线程非核心线程只是一个数量的说明,并不是说核心线程非核心线程有本质上的不同,它们都是普通的线程而已,并且线程特性都一样,不是说核心线程有特殊标记,线程池能“认”出来这是核心线程,对其有特殊操作。

四、线程池处理任务的策略

我们调用线程池的submit()或execute()方法,向线程池中放入一个任务执行的时候线程池到底是怎么按照其策略来执行的呢?接下来,我们对其执行策略进行详细介绍,介绍完会对构造函数中每个参数有更深刻印象的。

1,调用线程池的submit()或execute()方法向线程池中放入一个任务,线程池内部会检查运行的线程数量是否达到corePoolSize数量,如果没有达到,则创建一个线程执行放入的任务,不管已经创建的线程是否处于空闲状态,创建线程的任务由threadFactory来完成,关于ThreadFactory可以参考我的另一篇博客来学习:java线程池技术(一):ThreadFactory与BlockingQueue。

2,我们继续向线程池中放入任务,此时线程池中运行的线程数量已经达到corePoolSize数量,则新加入的任务将会被放入workQueue中,直到有线程处于空闲状态,则从workQueue中取出任务执行。

3,继续向线程池中放入任务,此时线程池中运行的线程数量已经达到corePoolSize数量,并且workQueue中已经放满任务不能再放入新的任务,那么这时候就继续创建新的线程,注意此时线程池中线程数量已经多余corePoolSize数量,多出来的线程就叫做非核心线程。用非核心线程来执行新放入的任务。当线程池中的线程超过你设置的corePoolSize参数,说明当前线程池中有所谓的“非核心线程”。当某个线程处理完任务后,如果等待keepAliveTime时间后仍然没有新的任务分配给它,那么这个线程将会被回收。线程池回收线程时,对所谓的“核心线程”和“非核心线程”是一视同仁的,直到线程池中线程的数量等于你设置的corePoolSize参数时,回收过程才会停止。

4继续向线程池中放入任务,此时线程池中运行的线程数量已经达到maximumPoolSize数量,并且workQueue中已经放满任务不能再放入新的任务,由于线程池中运行的线程

已经达到maximumPoolSize数量,所以无法再创建线程执行新放入的任务,此时handler参数就起作用了,在使用的时候相信大部分开发者都没用过这个参数,我们看下系统默认怎么处理的,

系统默认闯入传入的是defaultHandler,如下:

1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue,
6                               ThreadFactory threadFactory) {
7         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
8              threadFactory, defaultHandler);
9     }

初始化如下:

1     private static final RejectedExecutionHandler defaultHandler =
2         new AbortPolicy();

接下来看下AbortPolicy这个类吧:

 1     public static class AbortPolicy implements RejectedExecutionHandler {
 2         /**
 3          * Creates an {@code AbortPolicy}.
 4          */
 5         public AbortPolicy() { }
 6 
 7         /**
 8          * Always throws RejectedExecutionException.
 9          *
10          * @param r the runnable task requested to be executed
11          * @param e the executor attempting to execute this task
12          * @throws RejectedExecutionException always
13          */
14         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
15             throw new RejectedExecutionException("Task " + r.toString() +
16                                                  " rejected from " +
17                                                  e.toString());
18         }
19     }

AbortPolicy实现了RejectedExecutionHandler接口,在rejectedExecution方法中抛出RejectedExecutionException异常。

所以如果线程池中运行的线程数量已经达到maximumPoolSize数量,并且workQueueworkQueue中已经放满任务不能再放入新的任务,系统默认情况下就会抛出

RejectedExecutionException异常,我们也可以自己实现RejectedExecutionHandler接口,在rejectedExecution方法中实现自己策略。比如我自己写的网络请求框架就自己定义了

RejectedExecutionHandler,如下:

 1 public class RejectedPolicy implements RejectedExecutionHandler{
 2 
 3     @Override
 4     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 5          try {
 6              taskQuene.put(r);
 7         } catch (InterruptedException e) {
 8              e.printStackTrace();
 9         }
10     }
11 }

好了,以上就是线程池具体执行一个新任务的大体策略,是不是有了更深的认识???

以上分析中涉及的ThreadFactory与BlockingQueue如果你不是太了解,可以参考我的另一篇博客了解一下:java线程池技术(一):ThreadFactory与BlockingQueue。

好了,关于线程池大体介绍就到此为止,希望对你有用。

原文地址:https://www.cnblogs.com/leipDao/p/8508093.html