Executor框架

服务器应用中,串行处理机制通常无法提供高吞吐率或快速响应性。通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。

public class ThreadPerTaskWebServer {

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }

    private static void handleRequest(Socket connection) {
        //do something
    }
}

对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。

在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量线程时:

  1. 线程生命周期的开销非常高。线程的创建和销毁不是没有代价的。
  2. 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
  3. 稳定性。在可创建线程的数量上存在一个限制,这个限制值将随着平台的不同而不同,并且受多个因素制约。

Executor接口

public interface Executor{
    void executor(Runnable command);
}

线程池

Executors 静态工厂方法创建

  • newFixedThreadPool 创建一个固定长度的线程池
  • newCachedThreadPool 创建一个可缓存的线程池 规模不存在限制
  • newSingleThreadPool 创建单个工作者线程来执行任务 异常后会替代
  • newScheduledThreadPool 创建一个固定长度的线程池 以延时或定时方式来执行

以上实际都是通过ThreadPoolExecutor来创建的

public class Executors {
    public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

    public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
    }
    //...
}

ThreadPoolExecutor 核心线程池的内部实现

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  1. corePoolSize 指定了线程池中线程数量
  2. maximumPoolSize 指定了线程池中最大线程数量
  3. keepAliveTime 当线程池数量超过corePoolSize时,多余的空闲线程的存活时间
  4. unit keepAliveTime 的单位
  5. workQueue 任务队列 被提交但尚未被执行的任务
  6. threadFactory 线程工厂 用于创建线程,一般用默认的即可
  7. handler 拒绝策略。当任务太多来不及处理,如何拒绝任务

workQueue 可以是以下几种:
直接提交的队列:SynchronousQueue
有界的任务队列:ArrayBlockingQueue
无界的任务队列:LinkedBlockingQueue
优先任务队列:PriorityBlockingQueue

扩展线程池(extends java.util.concurrent.ThreadPoolExecutor)

ThreadFactory 线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。

public interface ThreadFactory{
    Thread newThread(Runnable r);
}

Executors中提供一个默认线程工厂的实现DefaultThreadFactory

  static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
   }

Executor的生命周期

为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用户生命周期管理的方法(还有一些用于任务提交的便利方法)。

public interface ExecutorService extends Executor{
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    <T> Future<T> submit(Callable<T> task);
    // .....
}

ExecutorService的生命周期有三种状态:运行,关闭和已终止

平缓的关闭方式–shutdown:不再接受新的任务,同时等待已经提交的任务执行完成( 包括那些还未开始执行的任务 )。
粗暴的关闭方式–shutdownNow:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

JVM只有在所有非守护线程全部终止后才会退出,如果无法正确地关闭Executor,那么JVM将无法结束。

================================== 赵客缦胡缨,吴钩霜雪明。 银鞍照白马,飒沓如流星。 ==================================
原文地址:https://www.cnblogs.com/lucare/p/9312662.html