Java线程池相关类

Java线程池相关类

// 最顶层接口,仅提供execute方法
public interface Executor {
  void execute(Runnable command);
}
// ExecutorService,扩展自Executor,提供了对任务的管理能力
public interface ExecutorService extends Executor {
  // 停止接受新任务
  void shutdown();
  
  // 停止在执行的任务
  List<Runnable> showdownNow();
  
  // 等待所有任务结束(完成、中断或超时)
  boolean awaitTermination(long timeout, TimeUnit unit);
  
  // 以及各种类型的submit方法,返回Future对象
  <T> Future<T> submit(Callable<T> task);
  
  // 以及批量处理任务
  <T> T invokeAny(Collection<> tasks);
  <T> List<Future<T>> invokeAll(Collection<> tasks);
}
// 其中涉及的Future接口
public interface Future<V> {
  // 取消执行任务
  boolean cancel(boolean mayInterruptIfRunning);
  //是否完成
  boolean isDone();
  // 等待任务执行结束并返回结果
  V get();
  V get(long timeout, TimeUnit unit);
}
// ExecutorService的默认抽象实现
public abstract class AbstractExecutorService implements ExecutorService {
  // 基础方法,用于生成Future对象
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
  }
  
	// submit仅把task封装为Future,再调用一次Executor.execute()方法
  public <T> Future<T> submit(Callable<T> task) {
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
  }
  
  // 还包含invokeAny、invokeAll的实现
}
// 然后说真正的线程池类
public class ThreadPoolExecutor extends AbstractExecutorService {}

// Fork/Join框架是JDK1.7开始提供的用于并行执行任务的框架,它的思想是将一个大任务分割成若干小任务,最终汇总
// 线程池和ForkJoin池都是直接继承自AbstractExecutorService
public class ForkJoinPool extends AbstractExecutorService {}

public class ThreadPoolExecutor {
  // ThreadPoolExecutor构造方法
  public ThreadPoolExecutor (
    int corePoolSize, // 线程池中核心的线程数(即使空闲也保持)
    int maximumPoolSize, // 允许最多的线程数
    long keepAliveTime, // 多出的线程空闲多久回收
    TimeUnit unit, // 回收的时间单位
    BlockingQueue<Runnable> workQueue, // 工作队列,用于存放已提交的任务
    ThreadFactory threadFactory, // 线程工厂,工厂方法:Thread newThread(Runnable r);
    RejectedExecutionHandler handler // 拒绝执行时的处理回调
  ) {
    // 
  }
  
  // 提交任务,分三步走:
  // 1. 如果线程数少于核心线程数,则创建新线程
  // 2. 否则尝试加入工作队列(加入之后会再次检查是否需要创建新线程)
  // 3. 如果不能加入工作队列,则创建新线程,否则拒绝
  public void execute(Runnable command) {
    // 线程数少时直接开启新线程
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true)) return;
    }
    
    // 加入队列
    if (isRunning(c) && workQueue.offer(command)) {
      // double check
    } else if (!addWorker(command, false)) {
      reject(command);
    }
    
  }
  
  // 线程池的Worker继承自AQS,定义了lock、unlock等方法
  private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    //
  }
  
}


public class ForkJoinPool {
  // ForkJoinPool构造方法
  public ForkJoinPool(
    int parallelism, // 并行度,默认取CPU的核心数
    ForkJoinWorkerThreadFactory factory, // worker线程的工厂类
    UncaughtExceptionHandler handler, // 异常处理回调
    boolean asyncMode // 控制工作队列的工作模式,如果为true则使用FIFO(从尾取),否则LIFO(从头取)
  ) {}
  
  public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    // externalPush();
  }
  
  public <T> ForkJoinTask<T> submit(Callable<T> task) {
    // externalPush(new ForkJoinTask.AdaptedCallable<T>(task));
  }
  
  public <T> ForkJoinTask<T> submit(Runnable<T> task) {
    // externalPush(new ForkJoinTask.AdaptedRunnableAction<T>(task));
  }
  
  // ForkJoinTask是一个抽象类
  // 递归的RecursiveTask(有返回值)
  // 递归的RecursiveAction(无返回值)
  public static class XXXTask extends RecursiveTask<Integer> {
    
    // 递归任务需实现compute方法
    // 该方法主要实现任务量的拆分(分治思想)
    @Override
    public Integer compute() {
      // this => XXXTask left, right;
      return left.join() + right.join();
    }
    
  }
  
}

// 最后看看Executors工具类
public class Executors {
  // 创建拥有固定线程数的线程池
  public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
      nThreads, // corePoolSize = nThreads
      nThreads, // maximumPoolSize = nThreads
      0L, // 不超时
      MILLISECONDS, 
      new LinkedBlockingQueue<Runnable>() // 无界队列
    );
  }
  
  // 创建ForkJoin线程池
  public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool(
    	parallelism, // 并行度
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, // 默认的线程工厂
      null, // 不设置异常处理
      true // 异步模式
    );
  }
  
  
  // 创建只有一个线程的线程池
	public static ExecutorService newSingleThreadExecutor() {
    // 
  }
  
  // 创建一个可缓存的线程池(需要时创建,不需要时回收)
  public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
      0, // 初始线程数
      Integer.MAX_VALUE, // 不限制最大数
      60L, // 空闲60秒回收
      TimeUnit.SECONDS, 
      new SynchronousQueue<Runnable>() // 同步队列(长度为0,因为需要时直接创建线程,不需要排队)
    );
  }
  
  // 创建一个可以调度任务的线程池
  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
  }
  
  public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(
          corePoolSize, // 
          Integer.MAX_VALUE, // 默认的可调度任务的线程池不限制最大线程数 
          0, // 而且不会过期
          NANOSECONDS,
          new DelayedWorkQueue()
        );
    }
  }
  
  // 补充一下ScheduledExecutorService
  public interface ScheduledExecutorService extends ExecutorService {
    // 延迟执行
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 
    
    // 固定频率执行(按开始时间)
    ScheduledFuture<?> scheduleAtFixedRate(
      Runnable command, 
      long initialDelay, // 第一次执行的延迟时间
      long period, // 每个固定的时间间隔后执行
      TimeUnit unit
    );
    
    // 固定延迟时间执行(按上一次完成时间)
    ScheduledFuture<?> scheduleWithFixedDelay(
      Runnable command, 
      long initialDelay, 
      long delay, 
      TimeUnit unit
    );
  }
  
}

最后再说一点:

阿里开发规范为什么不允许直接使用Executors创建线程池?

是因为Executors还是通过调用ThreadPoolExecutor实现,但是对于最大线程数和队列提供了默认方式,该方式容易被调用者忽略从而导致OOM问题,所以需要开发者根据实际情况选择合适的参数。

原文地址:https://www.cnblogs.com/jieyuefeng/p/12064703.html