new Thread和Executors实例
避免线程开启太多宕机
/**
* @author fangliu
* @date 2020-02-16
* @description 模拟多线将Word转换为PDF格式:处理时很长的耗时过程
*/
public class ThreadVs {
@Test
public void oldHandle() throws InterruptedException {
/**
* 使用循环来模拟许多用户请求的场景
*/
for (int request = 1;request <= 100;request++){
new Thread(()-> {
System.out.println("文档处理开始!");
try {
Thread.sleep(1000L*30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("文档处理结束!");
}).start();
}
Thread.sleep(1000L*1000);
}
@Test
public void newHandle() throws InterruptedException {
// 开启一个线程池:线程池的个数是10个
ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 使用循环来模拟许多用户请求的场景
*/
for (int request = 1;request <= 100;request++){
executorService.execute(()->{
System.out.println("文档处理开始!");
try {
// 模拟线程操作时间
Thread.sleep(1000L*30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("文档处理结束!");
});
}
Thread.sleep(1000L*1000);
}
}
线程池简介
- 什么是线程池
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕后不需要销毁线程而是放回池中,从而减少创建和销毁对象的开销。
线程池带来的好处
- 降低资源消耗
- 提高响应速度
- 提高线程的可管理性
简单线程池设计
- 提交任务的接口
- 任务队列
- 队列要多长
- 满了如何处理
- 线程池(开启、初始化、关闭)
- 创建多少?
- 执行器从线程池里面拿线程执行任务队列里面的任务,执行完成后归还线程并通过返回结果给接口
- 异步执行的接口
线程池的核心参数
/**
*
* @param corePoolSize 核心线程数量
* @param maximumPoolSize 最大线程数量
* @param keepAliveTime 当线程数大于corePoolSize时,线程空闲后的存活时间
* @param unit keepAliveTime时间单位
* @param workQueue 存放任务的阻塞队列
* @param threadFactory 执行程序创建新线程时使用的工厂
* @param handler 当队列和最大线程池都满了之后的饱和策略
*/
public void ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize与maximumPoolSize关系
- 池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程
- 池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程
- 池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务
- 池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务
线程池处理流程
graph LR
A[提交任务]-->B(核心线程数是否已满)
B-->|否|C(创建新线程执行任务)
B-->|是|D[阻塞队列是否已满]
D-->|否|E[将任务储存在阻塞队列]
D-->|是|F[线程池是否已满]
F-->|否|G[创建新线程执行任务]
F-->|是|H[按饱和策略处理任务]
线程池可选择的阻塞队列
- 无界队列
- 有界队列
- 同步移交队列
实例
/**
* @author fangliu
* @date 2020-02-17
* @description 常用阻塞队列
*/
public class QueueTest {
@Test
public void arrayBlockingQueue() throws InterruptedException {
/**
* 基于数组的有界阻塞队列,队容量为10
*/
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 循环向队列添加元素
for (int i = 0; i <20 ; i++) {
queue.put(i);
System.out.println("向队列中添加值:"+i);
}
}
@Test
public void linkedBlockingQueue() throws InterruptedException {
/**
* 基于链表的有界/无界阻塞队列,队容量为10 ,去掉容量会有一个默认值20
*/
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(20);
// 循环向队列添加元素
for (int i = 0; i <20 ; i++) {
queue.put(i);
System.out.println("向队列中添加值:"+i);
}
}
@Test
public void synchronousQueue(){
/**
* 同步移交阻塞队列 不储存队列,相当于生产者消费者模式
*/
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
// 插入值
new Thread(() -> {
try {
queue.put(1);
System.out.println("插入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 删除值
new Thread(() -> {
try {
queue.take();
System.out.println("删除成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
线程池饱和策略
- AbortPolicy终止策略(默认)
- 默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。
- DiscardPolicy抛弃策略
- 新提交的任务被抛弃。
- DiscardOldestPolicy抛弃旧任务策略
- 队列的是“队头”的任务,然后尝试提交新的任务。对头任务被丢弃(不适合工作队列为优先队列场景)
- CallerRunsPolicy调用者运行策略
- 为调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者。不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。
- 自定义饱和处理策略
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 循环,当队列有空位时,该任务进入队列,等待线程池处理
*/
public class TestRejectedExecutionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
常用线程池介绍
1. newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 这种类型的线程池特点是:
- 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
实例:
@Test
public void newCachedThreadPool(){
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
TicketSailTask task = new TicketSailTask();
for(int i=0;i<10;i++) {
cachedThreadPool.execute(task);
}
}
//多个售票员同时执行售票任务
public static class TicketSailTask implements Runnable{
public static int number = 100;//总共有100张票
public Object lock = new Object();//创建锁,防止多个线程同时卖同一张票
@Override
public void run() {
while(true) {
//进行同步锁
synchronized(lock) {
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
if(number>0) {
System.out.println(Thread.currentThread().getName()+"正在卖第"+number+"张票");
number--;
}else {
break;
}
}
}
}
2. newFixedThreadPool
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
执行下面代码,发现只有三个线程被创建
@Test
public void newFixedThreadPool(){
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for(int i=0;i<6;i++) {
ThreadTask task = new ThreadTask();
fixedThreadPool.execute(task);
}
}
public class ThreadTask implements Runnable {
@Override
public void run() {
while(true) {
System.out.println(Thread.currentThread().getName()+" is running...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3. newSingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
4. newScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
@Test
public void newScheduledThreadPool(){
//创建固定长度的线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
for(int i =0;i<3;i++) {
ThreadTask1 task = new ThreadTask1();
scheduledThreadPool.scheduleAtFixedRate(task, 5, 10, TimeUnit.SECONDS);
//第一个参数为需要执行的任务,第二个参数为任务开始前延迟时间,第三个参数为任务执行周期,第四个参数为时间单位
}
}
public class ThreadTask1 implements Runnable {
@Override
public void run() {
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//可以方便地修改日期格式
String hehe = dateFormat.format( now );
System.out.println(Thread.currentThread().getName()+" running at "+hehe);
}
}
向线程池提交任务的两种方式
/**
* @author fangliu
* @date 2020-02-18
* @description 向线程池提交任务的两种方式
*/
public class RunTest {
@Test
public void submitTest() throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
/**
* 利用submit方法提交任务,接受任务的返回结果
*/
Future<Integer> submit = executor.submit(() -> {
Thread.sleep(1000L * 10);
return 2 * 5;
});
/**
* 阻塞方法,直到任务有返回值后,才向下执行
*/
Integer num = submit.get();
System.out.println("执行结果:"+num);
}
@Test
public void executeTest() throws InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
/**
* 利用execute方法提交任务,没有返回结果
*/
executor.execute(() -> {
try {
Thread.sleep(1000L * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer num = 2 * 5;
System.out.println("执行结果:"+num);
});
Thread.sleep(1000L * 10);
}
}
线程池的状态
graph LR
A[RUNNING]-->B(SHUTDOWN)
A-->C(STOP)
B-->F(TIDYING)
C-->F
F-->G[TERMINATED]
- 线程池的初始化状态是RUNNING。
- 线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
- RUNNING状态调用shutdown()方法
- 不再接受新任务
- 队列中的任务执行完毕
- 状态由RUNNING变为SHUTDOWN
- RUNNING状态调用shutdownNow()方法
- 不再接受新任务
- 丢弃队列中的任务
- 中断正在执行的任务
- 状态由RUNNING变为STOP
- 当线程池在STOP状态下,线程池中执行的任务为空时,状态由STOP变为TIDYING
- 当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,状态由SHUTDOWN变为TIDYING。
- 当线程池处在TIDYING状态时,执行完terminated()之后,状态由 TIDYING变为TERMINATED。