javade多任务处理之Executors框架(线程池)实现的内置几种方式与两种基本自定义方式

一 Executors框架(线程池)

主要是解决开发人员进行线程的有效控制,原理可以看jdk源码,主要是由java.uitl.concurrent.ThreadPoolExecutor类实现的,这里只列出简单用法

根据Executors可以创建不同功能的线程池,主要有四种:

1 newFixedThreadPool : 返回一个固定数量的线程池,并且池中数量一致保持不变,有任务时如果有空闲线程则立即执行,没有就暂时存放到队列等待空闲线程

//创建一个有10个线程的线程池,任务多于10个会一直等待,直到10个线程中有空闲线程为止
//ExecutorService pool = Executors.newFixedThreadPool(10);
//开启线程
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));

 在ThreadPoolExecutor类中有几个非常重要的方法:

    execute()   执行线程

  submit()  执行线程,并返回执行的结果

  shutdown()  关闭线程,如果有线程正在执行,则会等线程执行完,即所有线程空闲之后再关闭

  shutdownNow()  关闭线程,立刻关闭

看源码可知,底层任务使用了阻塞队列,任务大于线程数量后会进入阻塞队列等待执行

2 newSingleThreadExecutor

		ExecutorService poolSingle = Executors.newSingleThreadExecutor();
		poolSingle.execute(new Thread(()->{
			System.out.println("run1");
			while(true){
			}
		}));				
		//下面永不会执行
		poolSingle.execute(new Thread(()->{System.out.println("run2");}));

  创建只有一个线程的线程池,属于上面固定数量的一个特例,个人感觉这个单例线程用处不是很大

3 newCachedThreadPool

		不指定数量.如果任务多,且没有空闲线程则会一直创建线程,并且空闲线程会在60秒后回收
		ExecutorService cachePool = Executors.newCachedThreadPool();
		cachePool.execute(new Thread(()->{
			System.out.println("run1");
			while(true){
			}
		}));
		cachePool.execute(new Thread(()->{
			System.out.println("run2");
			while(true){
			}
		}));

  底层实现:

由于使用了无缓冲队列,任务队列使用无缓冲队列,任务不会被存储,直接去交给线程执行.且空闲线程60秒之后会被回收,除非60秒之内去执行新的任务

4 newScheduledThreadPool

class RunTest implements Runnable{
	@Override
	public void run() {
		System.out.println("run");
	}
}
public class ScheculedTest {
	public static void main(String[] args) {
		ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
		//可实现任务定时,但实际业务中一般都用spring的定时器,这个技术比较老了
		//pool.schedule(new RunTest(), 2, TimeUnit.SECONDS);//2秒后执行此任务
		//可实现任务的轮询,指定时间间隔循环执行此任务     任务    延迟时间(即2秒后开始执行)   间隔多少秒之后再执行(一直循环)
		pool.scheduleAtFixedRate(new RunTest(), 2, 2, TimeUnit.SECONDS);
		pool.scheduleWithFixedDelay(new RunTest(), 2, 2, TimeUnit.SECONDS);
	}
}

  不用想也知道,底层采用延迟队列装任务

二 自定义线程池实现的

再来看自定义的,实际业务中可能用到更多的就是自定义了,

自定义需要实现ThreadPoolExecutor这个类,主要有两种,分别是使用有界队列装任务和无界队列:

1 使用有界队列:

public class MyCustom01 {

	public static void main(String[] args) {
		//有界队列自定义线程池
		/**在使用有界队列时,如果有任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
		 * 若大于corePoolSize,则会将任务加入队列,
		 * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
		 * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
		 */
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1,   //核心线程数
				2,   //最大线程数
				60,   //空闲时回收时间
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3),//指定队列//指定队列
				//DiscardOldestPolicy :丢弃最老的请求,即当6进来时,将队列中最早的2丢弃,执行6
				//new ThreadPoolExecutor.DiscardOldestPolicy()//指定拒绝策略.不指定默认抛异常
				//自定义拒绝策略
				new RejectedExecutionHandler(){
					@Override        //r:被拒绝任务     executor 当前线程池
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						//实例工作中默认策略都不可取,一般是把拒绝的任务记录日志,然后空闲时间解析再重新执行
						//或者直接返回客户端,让他稍后再提交此任务
						System.out.println("自定义处理..");
						System.out.println("当前被拒绝任务为:" + r.toString());
						System.out.println(executor.getPoolSize());
					}}
		); 
		MyTask mt1 = new MyTask(1, "任务1");
		MyTask mt2 = new MyTask(2, "任务2");
		MyTask mt3 = new MyTask(3, "任务3");
		MyTask mt4 = new MyTask(4, "任务4");
		MyTask mt5 = new MyTask(5, "任务5");
		MyTask mt6 = new MyTask(6, "任务6");
		pool.execute(mt1);
		pool.execute(mt2);//如果有两个任务,则会将第二个任务入队,等待核心线程线程空闲
		pool.execute(mt3);//同上
		pool.execute(mt4);//同上,知道队列满 为止
		pool.execute(mt5);//当有5个任务时,队列已满,则会创建第二个线程
		pool.execute(mt6); //有6个时,此时队列已满,且当前线程达到最大线程数,所以无法执行,执行拒绝策略
		//关闭,等到所有线程空闲时才会关闭
		pool.shutdown();
	}
}

  

2 使用无界队列:

public class MyCustom02 implements Runnable{
	//并发数字运算
	private static AtomicInteger count = new AtomicInteger(0);
	@Override
	public void run() {
		try {
			int temp = count.incrementAndGet();//count累加,区分任务
			System.out.println("任务" + temp);
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
	}
	public static void main(String[] args) throws Exception{
		//无界队列
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
		ExecutorService executor  = new ThreadPoolExecutor(
					5, 		
					10,//无界时,任务多会一直添加到队列中,不会创建新的线程,线程数会一直是核心数量
					120L, 	
					TimeUnit.SECONDS,
					queue //指定无界队列
		);
		for(int i = 0 ; i < 20; i++){
			executor.execute(new MyCustom02()); //会5个一起执行
		}
		Thread.sleep(1000);
		System.out.println("queue size:" + queue.size());
		Thread.sleep(2000);
	}
}

  然后拒绝策略个人认为也是比较常用的,实际业务中,因为默认的策略都不太友好

原文地址:https://www.cnblogs.com/houzheng/p/9193031.html