J.U.C体系进阶(一):juc-executors 执行器框架

Java - J.U.C体系进阶

作者:Kerwin

邮箱:806857264@qq.com

说到做到,就是我的忍道!

主要内容:

  • juc-executors 执行器框架
  • juc-locks 锁框架
  • juc-atomic 原子类框架
  • juc-sync 同步器框架
  • juc-collections 集合框架

总览:
在这里插入图片描述

juc-executors 执行器框架

Executors框架基本概述

ScheduledExecutorService

// ScheduledExecutorService其实属于上层代码,和下面三种的实现不一致

public static void main(String[] args) {
		ScheduledExecutorService executorService =                 Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("run "+ System.currentTimeMillis());
            }
        }, 0, 1000, TimeUnit.MILLISECONDS);
}

newSingleThreadScheduledExecutor  一次只有一个线程,每隔一秒钟执行一次
newScheduledThreadPool(5);        延时线程池,允许池内五个线程运行

schedule                - 出问题后,依然按顺序执行
scheduleAtFixedRate     - 出问题后,立即加速执行,按原定计划执行

// Testin线程池做法
1.随着项目启动,创建周期性执行线程池ScheduledExecutorService --- 确定其核心线程工作数量
2.随着项目启动,初始化查询线程-内含任务队列及再初始化工作线程池
3.查询线程中run方法复杂查询需要处理的任务,然后放到队列中,初始化工作线程(传递任务及队列),由当前工作线程池去管理执行
4.工作线程处理,处理完毕后,移除当前队列中对应的一条信息
注:这里说的队列用的是比较low比的map...

newFixedThreadPool

/**
 * Executors默认方法 - 创建一个具有固定线程数的Executor,控制最大线程并发数
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

// 阿里推荐的创建方式:
ThreadFactory nameFactory = new ThreadFactoryBuilder().setNameFormat("fixThread").build();
ExecutorService eService = new ThreadPoolExecutor(
    5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    nameFactory, new ThreadPoolExecutor.AbortPolicy());

// 参数说明:
corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime: 线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue: 线程池所使用的缓冲队列

nameFactory: 线程命名,让名字更有意义

handler: 线程池对拒绝任务的处理策略

newSingleThreadExecutor

最大运行线程数为 1
简单创建方式:
ExecutorService service = Executors.newSingleThreadExecutor();

// 阿里推荐的创建方式:
ThreadFactory nameFactory = new ThreadFactoryBuilder().setNameFormat("single").build();
ExecutorService eService = new ThreadPoolExecutor(
    1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    nameFactory, new ThreadPoolExecutor.AbortPolicy());

newCachedThreadPool

// 可缓存的线程池
ExecutorService service = Executors.newCachedThreadPool();

// 阿里推荐的创建方式:
ThreadFactory nameFactory = new ThreadFactoryBuilder().setNameFormat("single").build();
ExecutorService eService = new ThreadPoolExecutor(
    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
    nameFactory, new ThreadPoolExecutor.AbortPolicy());

普通线程池 - ThreadPoolExecutor

/**
 * 使用给定的参数创建ThreadPoolExecutor.
 *
 * @param corePoolSize    核心线程池中的最大线程数
 * @param maximumPoolSize 总线程池中的最大线程数
 * @param keepAliveTime   空闲线程的存活时间
 * @param unit            keepAliveTime的单位
 * @param workQueue       任务队列, 保存已经提交但尚未被执行的线程
 * @param threadFactory   线程工厂(用于指定如果创建一个线程)
 * @param handler         拒绝策略 (当任务太多导致工作队列满时的处理策略)
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);   // 使用纳秒保存存活时间
    this.threadFactory = threadFactory;
    this.handler = handler;
}

拒绝策略:

1.AbortPolicy(默认)

AbortPolicy策略其实就是抛出一个RejectedExecutionException异常:

2.DiscardPolicy

DiscardPolicy策略其实就是无为而治,什么都不做,等任务自己被回收:

3.DiscardOldestPolicy

DiscardOldestPolicy策略是丢弃任务队列中的最近一个任务,并执行当前任务:

4.CallerRunsPolicy

CallerRunsPolicy策略相当于以自身线程来执行任务,这样可以减缓新任务提交的速度。

线程池工作详解:

关于ThreadPoolExecutor这个线程池,最重要的是根据系统实际情况,合理进行线程池参数的设置以及阻塞队列的选择。现实情况下,一般会自己通过ThreadPoolExecutor的构造器去构建线程池,而非直接使用Executors工厂创建,因为这样更利于对参数的控制和调优。

另外,根据任务的特点,要有选择的配置核心线程池的大小:

  • 如果任务是 CPU 密集型(需要进行大量计算、处理),则应该配置尽量少的线程,比如 CPU 个数 + 1,这样可以避免出现每个线程都需要使用很长时间但是有太多线程争抢资源的情况;
  • 如果任务是 IO密集型(主要时间都在 I/O,CPU 空闲时间比较多),则应该配置多一些线程,比如 CPU 数的两倍,这样可以更高地压榨 CPU

Future模式

// J.U.C -> 提供异步执行结果支持
// call接口提供执行返回结果,需要注意的是:
// isDone 表示是否执行完毕
// get    如果线程未执行完毕,则会阻塞当前线程

public static void main(String[] args) throws Exception {
    FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("我是线程 i am working");
            Thread.sleep(3000);
            return "hello futureTask";
        }
    });
    new Thread(future).start();
    System.out.println("是否完成: " + future.isDone());
    System.out.println("阻塞等待结果: " + future.get());
}

Fork/Join - 工作窃取算法(work-stealing)

// Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。使用工作窃取(work-stealing)算法,主要用于实现“分而治之”。
// 应用场景非常明确,可以在大任务线程下再进行分割,当然这是对于Executtor的补充,所以总是有别的线程池通过代码的方式去替代它,但这个思想应该理解和掌握

/***
 * Fork/Join 工作窃取算法算法
 * 
 * @author 柯贤铭
 * @date 2019年4月19日
 * @email 806857264@qq.com
 */
public class TestFork {

	public static void main(String[] args) {
		// 模拟数组
		int[] intArr = new int[10000];
		for (int i = 0; i < 10000;i++) {
			intArr[i] = i;
		}
		
		// ForkJoinPool
		ForkJoinPool executor = new ForkJoinPool();
		ArraySumTask task = new ArraySumTask(intArr, 0, 9999);
		ForkJoinTask<Long> future = executor.submit(task);
		
		// ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以主动捕获
		if (future.isCompletedAbnormally()) {
			System.out.println(future.getException());
		}

		try {
			Long start = System.currentTimeMillis();
			System.out.println("result: " + future.get());
			System.out.println("耗时: " + (System.currentTimeMillis() - start));
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
}

class ArraySumTask extends RecursiveTask<Long> {

	private int[] array;
	private int begin;
	private int end;

	private static final int THRESHOLD = 100;

	public ArraySumTask(int[] array, int begin, int end) {
		this.array = array;
		this.begin = begin;
		this.end = end;
	}

	@Override
	protected Long compute() {

		Long total = 0L;
		
		// 分治算法
		if (this.end - this.begin + 1 < ArraySumTask.THRESHOLD) {
			for (int i = begin; i <= end; i++) {
				total += array[i];
			}
		} else {
			int middle = (this.begin + this.end) / 2;

			ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle);
			ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end);
			
			// 执行子线程
			invokeAll(subtask1, subtask2);
			
			long sum1 = subtask1.join();
			long sum2 = subtask2.join();

			total = sum1 + sum2;
		}
		return total;
	}
}

调度逻辑:
在这里插入图片描述

Fork/Join的内容博大精深,这只是非常的片面介绍了Fork框架的基础用法,如果刚好有实际需求,需要再仔细研读

原文地址:https://www.cnblogs.com/kkzhilu/p/12859509.html