线程池

八、线程池

ThreadPoolExecutor

在之前的demo中,都是使用new Thread()手动创建线程池。但是在工作中使用的话,阿里巴巴编码规约明确说明,线程必须交给线程池来管理。避免资源耗尽的风险。

传统的手动new的方式创建的线程,如果线程非常多的话,就会非常杂乱,无法管理。线程之间互相竞争资源,容易产生线程乱入的风险。线程如果非常多,会造成线程切换频繁,浪费cpu资源。线程多也会增加系统资源耗尽的风险。

所以我们必须使用线程池管理。

关于ThreadPoolExecutor

先来查看ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

构造方法包含的参数有:

  • corePoolSize:线程池的核心线程数
  • maximumPoolSize: 线程池最大线程数
  • keepAliveTime:空闲线程的存活时间
  • unit:空闲线程存活时间单位
  • workQueue:线程池等待队列。
  • threadFactory:线程工厂,生产线程的类。
  • handler:线程拒绝策略。当线程池满的时候,该怎么拒绝后来的线程。

要想理解上面的参数。我们首先要理解线程池的工作原理,线程池的存在就是为了管理线程。池化思想的实现无非就是:节省资源,提速,资源复用,方便管理。线程池的实现也是为了方便管理线程,复用现有线程,节省线程资源开销,避免线程过多时cpu浪费大量时间在线程的切换上。

当我们使用线程池执行任务时,会经历如下流程:

image-20210120163025578

上图就是线程池的大概的工作流程,看了上面的图,大概就对线程池的参数的意义有了大概的了解了。

线程池的创建

线程池的创建可以通过new ThreadPoolExecutor()来实现。

package com.xiazhi.pool;


import java.util.concurrent.*;

/**
 * @author 赵帅
 * @date 2021/1/20
 */
public class ExecutorCreateDemo {

    public static void main(String[] args) {

        // 如果不指定threadFactory,会使用默认的线程工厂 Executors.defaultThreadFactory()
        // 如果不指定拒绝策略,会使用默认的拒绝策略 new AbortPolicy()
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4,
                8,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100));
    }
}

java通过Executors类默认实现了几个线程池:

  • Executors.newSingleThreadExecutor(): 这是一个单线程的线程池,实现方式为

    		public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    可以看到也是通过new ThreadPoolExecutor的方式实现,不过这个线程池的核心线程数和最大线程数都是1,因此里面永远只有一个线程。而且他的等待队列为 LinkedBlockingQueue, 这个队列是无界队列,当任务的生产速度大于消费速度时,队列就会不停的堆积,容易造成内存溢出。

    只有一个队列,为什么要使用线程池?

    可以使用线程池来管理这个线程的生命周期。

    使用方式:

      	@Test
        public void singleThreadExecutor() {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            executorService.execute(() -> System.out.println("hello world"));
        }
    
  • Executors.newFixedThreadPool(int size): 创建一个固定数量的线程池。实现为:

    		public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    可以看到,在创建线程池时,核心线程数和最大线程数一样,而且使用的也是LinkedBlockingQueue,使用的弊端与上面一样,容易造成内存溢出。

    使用方式:

        @Test
        public void fixedThreadPool() {
            ExecutorService service = Executors.newFixedThreadPool(5);
            service.execute(() -> System.out.println("hello world"));
        }
    
  • Executors.newCachedThreadPool(): 创建一个缓存线程池。此线程池的实现方式为:

    		public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    可以看到这个线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,可以认为最大线程数是无界的。存活时间是60秒,如果线程超过60秒没有新的任务,那么就会被销毁。使用的是SynchronousQueue。分析队列为SynchronousQueue,也就以为着每次又一个任务到达时,就必须有一个线程处理这个任务,否则就会阻塞等待。那么当一个任务到达时,如果没有空闲线程,就会不断的创建新的线程,而最大线程数为无界,当任务生产速度大于消费速度时就会增加资源耗尽的风险。而且线程数比较大时也会增加上下文切换的开销,最终造成cpu的时间全都浪费在线程切换上,最终反而降低性能。

    使用方式:

        @Test
        public void cachedThreadPool() {
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(() -> System.out.println("hello world"));
        }
    
  • Executors.newScheduledThreadPool(): 创建一个定时线程池。实现为:

        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    最大线程数仍为Integer.MAX_VALUE,因此会有资源耗尽的风险,而且线程存活时间为0,线程复用率低。

    使用方式:

        @Test
        public void scheduledThreadPool() throws InterruptedException {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
            scheduledExecutorService.schedule(() -> System.out.println("hello world"), 10, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(20);
        }
    

上面集中线程池的创建方式虽然方便,但是都有缺陷,而且都会造成资源耗尽的风险。

阿里巴巴代码规约规定:禁止通过Executors创建线程池,必须通过new ThreadPoolExecutor的方式创建线程池。除了因为Executors创建的线程池存在上面的缺陷外,Executors隐藏了线程池的创建细节参数,可读性差,而且会影响初学者。

线程工厂

线程池在创建时需要手动指定线程工厂,线程工厂是为了创建线程时为线程指定名字,出问题时方便排查错误。Executors提供了一个默认的线程工厂的实现:

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

但是这个默认工厂的实现,但是在实际使用中为了能够在出问题时能够方便快速的定位,需要定义一个有意义的线程名。因此需要自定义线程工厂实现。

ThreadGroup

在DefaultThreadFactory中我们看到一个ThreadGroup的类,ThreadGroup--线程组。

我们可以把线程归属到某个线程组中,线程组中可以包含多个线程组,线程和线程组间组成树状关系。使用线程组可以方便我们管理线程。

查看ThreadGroup的构造方法:

		public ThreadGroup(String name) {
        this(Thread.currentThread().getThreadGroup(), name);
    }

		public ThreadGroup(ThreadGroup parent, String name) {
        this(checkParentAccess(parent), parent, name);
    }

可以指定父线程组创建线程组,当不指定父线程组时,使用当前线程作为父线程组。

创建线程关联线程组:

package com.xiazhi.pool;

import java.util.concurrent.TimeUnit;

/**
 * @author 赵帅
 * @date 2021/1/20
 */
public class ThreadGroupDemo {

    static void run() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            System.out.println("线程被中断");
        }
        System.out.println("hello threadGroup");
    }
    public static void main(String[] args) {
        // 使用new ThreadGroup()
        ThreadGroup threadGroup = new ThreadGroup("test-group-1");
        ThreadGroup subGroup = new ThreadGroup(threadGroup, "test-group-1");
        // 创建线程时指定线程组
        new Thread(threadGroup, ThreadGroupDemo::run, "thread-1").start();
        new Thread(threadGroup, ThreadGroupDemo::run, "thread-2").start();
        new Thread(subGroup, ThreadGroupDemo::run, "sub-thread-1").start();

        // 获取活动线程数及线程组数
        System.out.println("threadGroup.activeCount() = " + threadGroup.activeCount());
        // 活动线程组数
        System.out.println("threadGroup.activeGroupCount() = " + threadGroup.activeGroupCount());
        // 打印线程组名称
        System.out.println("threadGroup.getName() = " + threadGroup.getName());
        // 输出线程组的所有子节点
        threadGroup.list();
        // 调用interrupt方法会将线程组的所有线程中断标志设置为true
        threadGroup.interrupt();
    }
}

自定义线程工厂

自定义线程工厂需要实现ThreadFactory接口。

package com.xiazhi.pool;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 赵帅
 * @date 2021/1/20
 */
public class SimpleThreadFactory implements ThreadFactory {
    /** 线程自增id */
    private final AtomicInteger number = new AtomicInteger(0);
    /** 归属线程组 */
    private final ThreadGroup group;
    /** 线程名前缀 */
    private final String prefix;

    public SimpleThreadFactory() {
        this.group = new ThreadGroup("test-group");
        this.prefix = "pool-test-thread";
    }

    @Override
    public Thread newThread(Runnable task) {
        Thread thread = new Thread(this.group, task,
                this.prefix + this.number.incrementAndGet());

        if (thread.isDaemon()) {
            thread.setDaemon(false);
        }
        return thread;
    }
}
守护线程

在上面的代码中我们注意到有这么一句代码。

				if (thread.isDaemon()) {
            thread.setDaemon(false);
        }

这段代码的意思是:判断当前线程是否是守护线程,如果是守护线程,那么就将当前线程设置为正常线程。

什么是守护线程?

守护线程是一个特殊的线程,当进程中不存在非守护线程时,守护线程就会销毁。jvm中的垃圾回收器就是守护线程,当jvm中没有运行中的非守护线程时,jvm就会退出。

守护线程的用法如下:

package com.xiazhi.pool;

import java.util.concurrent.TimeUnit;

/**
 * @author 赵帅
 * @date 2021/1/20
 */
public class DaemonThreadDemo {

    static void daemonRun() {
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("守护线程运行");
        }
    }

    public static void main(String[] args) {

        Thread daemon = new Thread(DaemonThreadDemo::daemonRun);
        daemon.setDaemon(true);
        Thread thread = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "开始执行");
            try {
                // 模拟线程处理业务耗时
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "执行结束");
        });
        thread.start();
        daemon.start();
    }
}

线程池的拒绝策略RejectedExecutionHandler

根据上面的线程池执行的流程图,可以知道当线程池的等待队列已满,而且线程池已经达到最大线程数,那么后面再来的任务就回调用拒绝策略处理。java线程池默认提供了4种拒绝策略:

  • AbortPolicy: 抛出异常处理。当线程进入拒绝策略时,就会抛出RejectedExecutionException异常。
  • DiscardPolicy: 丢弃被拒绝的任务。
  • DiscardOldestPolicy: 丢弃最老的未被处理的任务。
  • CallerRunsPolicy: 调用线程处理。谁提交的这个任务,那么就叫给这个线程处理。

上面四种线程拒绝策略,无论哪儿一种,再生产环境时都是不可取的,我们在工作中,一般都会自定义拒绝策略,将任务存入mq或存入其他地方,等待线程池空闲时执行。但是不能不执行。

自定义拒绝策略:

自定义拒绝策略只需要实现RejectedExecutionHandler接口。

package com.xiazhi.pool;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 赵帅
 * @date 2021/1/21
 */
public class MessageQueuePolicy implements RejectedExecutionHandler {

    /** 模拟为mq容器,拒绝任务会被放入容器中 */
    public final List<CustomTask> list = new ArrayList<>();


    static class CustomTask implements Runnable, Serializable {

        private final String name;

        public CustomTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(String.format("[%s]处理任务:%s", Thread.currentThread().getName(), name));
        }
    }

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        // 线程池未关闭
        if (!executor.isShutdown()) {
            System.out.println("进入线程拒绝策略...");
            CustomTask task = (CustomTask) runnable;
            list.add(task);
        }
    }
}
创建一个自定义线程工厂及拒绝策略的线程池
package com.xiazhi.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author 赵帅
 * @date 2021/1/21
 */
public class CustomThreadPoolDemo {

    private final ExecutorService executorService;

    public CustomThreadPoolDemo() {
        executorService = new ThreadPoolExecutor(3,
                3,
                0L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                new SimpleThreadFactory(),
                new MessageQueuePolicy());
    }

    public static void main(String[] args) {
        CustomThreadPoolDemo poolDemo = new CustomThreadPoolDemo();

        for (int i = 0; i < 20; i++) {
            poolDemo.executorService.execute(new MessageQueuePolicy.CustomTask("张三"));
        }
    }
}

线程池的使用

线程池ThreadPoolExecutor实现了ExecutorService接口,而ExecutorService接口又继承了Executor接口,线程池的常用方法有:

  • void execute(Runnable command): 提交Runnable任务
  • Future<?> submit(Runnable task): 提交Runnable任务并返回代表该任务的Future,任务成功完成后,调用Futureget()方法将返回null
  • Future<?> submit(Runnable task, T result): 提交Runnable任务并返回代表该任务的Future, 任务完成后调用Futureget()方法将返回给定的result。
  • Future<T> submit(Callable<T> task): 提交带返回值的任务并返回代表该任务的Future,任务执行完成后调用Futureget()方法获取任务执行返回值。
  • void shutdown(): 启动关闭线程池。在线程池关闭过程中,会继续执行已经提交的任务(包含等待队列中的任务),但是不会接收新的任务,当所有任务都执行完毕,线程池关闭。如果线程池已经关闭,再次调用不会有影响。
  • List<Runnable> shutdownNow(): 马上关闭线程池。暂停所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务列表。会给所有正在执行的线程发送interrupt()中断消息。
  • boolean isShutdown(): 线程池是否关闭。返回是否在执行线程池的关闭程序,也可以说返回是否调用过shutdown()方法。
  • boolean isTerminated(): 线程池是否终止。isShutdown()方法返回的是是否线程池调用过shutdown()shutdownNow()方法,而无论调用哪儿个方法线程池都不会立即完毕,会处理完当前线程的任务或设置线程中断。无论哪儿种方式,都是无法立即结束线程的。而此方法返回的就是当前线程池中是否还有存活的线程。线程的生命周期,终止状态为Terminate,当线程池中所有线程的状态都为Terminate时,返回true。
  • boolean awaitTermination(long timeout, TimeUnit unit): 等待终止,如果超时,立即终止
  • List<Future<T>> invokeAll(Collection< ? extend Callable<T>> tasks): 批量执行任务,并返回代表人物的Future集合。
  • T invokeAny(Collection<? extend Callable<T>> tasks): 批量执行任务,当有一个任务有执行结果时,返回此结果并取消其他的任务。

下面进入方法实践:

package com.xiazhi.pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 赵帅
 * @date 2021/1/21
 */
public class UseThreadPoolDemo {

    private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3,
            5,
            0L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(20),
            new SimpleThreadFactory());

    static void runCommand() {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println(String.format("[%s]开始执行command", Thread.currentThread().getName()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static String runTask() {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println(String.format("[%s]开始执行task", Thread.currentThread().getName()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 提交无返回值的Runnable任务
        poolExecutor.execute(UseThreadPoolDemo::runCommand);
        // 等待线程执行结束
        TimeUnit.SECONDS.sleep(3);
        System.out.println("============================================================");

        // submit提交Runnable返回null值得任务
        Future<?> result = poolExecutor.submit(UseThreadPoolDemo::runCommand);
        System.out.println("submit(Runnable) result= " + result.get());
        System.out.println("============================================================");

        // 当任务执行结束返回指定返回值
        Future<String> successful = poolExecutor.submit(UseThreadPoolDemo::runCommand, "successful");
        System.out.println("successful.get() = " + successful.get());
        System.out.println("============================================================");

        // 提交带返回值得Callable任务
        Future<String> submit = poolExecutor.submit(UseThreadPoolDemo::runTask);
        System.out.println("submit.get() = " + submit.get());
        System.out.println("============================================================");

        // 批量提交任务执行任务,返回批量结果
        List<Callable<String>> tasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(UseThreadPoolDemo::runTask);
        }
        List<Future<String>> futures = poolExecutor.invokeAll(tasks);
        for (Future<String> future : futures) {
            System.out.println("future.get() = " + future.get());
        }
        System.out.println("============================================================");


        // 批量提交任务,成功任意一个取消其他任务
        List<Callable<String>> anyTasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(UseThreadPoolDemo::runTask);
        }
        String resultStr = poolExecutor.invokeAny(tasks);
        System.out.println("resultStr = " + resultStr);
        System.out.println("============================================================");

        // 关闭线程池
        poolExecutor.shutdown();
        System.out.println("poolExecutor.isShutdown() = " + poolExecutor.isShutdown());
        System.out.println("============================================================");

        System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
        System.out.println("============================================================");

        TimeUnit.SECONDS.sleep(3);
        System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
    }
}

线程池的大小设置

线程池的线程数大小是否越大越好?

在回答这个问题前先看下面代码:

package com.xiazhi.pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 要使用线程池启用多线程对一个数进行自增,从0加到1_000_000
 *
 * @author 赵帅
 * @date 2021/1/21
 */
public class PoolThreadNumDemo {

    /**
     * 方式1,启动100个线程每隔线程加10_000
     */
    public static void way1() throws InterruptedException, ExecutionException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 30L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        AtomicInteger num = new AtomicInteger();
        ArrayList<Callable<Integer>> callables = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            callables.add(() -> {
                for (int j = 0; j < 10_000; j++) {
                    num.incrementAndGet();
                }
                return num.get();
            });
        }

        long start = System.nanoTime();
        List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
        for (Future<Integer> future : futures) {
            future.get();
        }
        System.out.println(String.format("way1调用总耗时:%s,结果:%s", (System.nanoTime() - start), num.get())); // 耗时:82152778
    }


    /**
     * 方式2,启动10个线程每隔线程加1_000_000
     */
    public static void way2() throws InterruptedException, ExecutionException {
        int core = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, core + 1, 30L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        AtomicInteger integer = new AtomicInteger(0);
        ArrayList<Callable<Integer>> callables = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            callables.add(() -> {
                for (int j = 0; j < 100_000; j++) {
                    integer.incrementAndGet();
                }
                return integer.get();
            });
        }

        long start = System.nanoTime();
        List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
        for (Future<Integer> future : futures) {
            future.get();
        }
        System.out.println(String.format("way2调用总耗时:%s,结果:%s", (System.nanoTime() - start), integer.get()));// 耗时:32637464
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        way1();
        way2();
    }
}

通过上面代码可以知道并不是线程越多越好,因为线程越多带来的后果就是线程切换频繁,cpu时间都耗费在线程切换上,cpu的利用率也就变相的降低了。

那么线程数如何设置:

线程数的设置与电脑配置,也就是CPU核数有关,还要关联业务。一般可以根据公式估算线程数:

线程数=N*cpu利用率*(1+等待时间/计算时间)

当然这个公式只能作为估算值,具体的值需要根据业务以及压测结果进行调整。

线程池的拓展

线程池提供了钩子函数,可以在线程执行前,执行后,以及线程池销毁时对线程池进行自定义扩展。

package com.xiazhi.pool;

import java.util.concurrent.*;

/**
 * @author 赵帅
 * @date 2021/1/21
 */
public class ThreadPoolExecutorProvider extends ThreadPoolExecutor {
    public ThreadPoolExecutorProvider(int corePoolSize,
                                      int maximumPoolSize,
                                      long keepAliveTime,
                                      TimeUnit unit,
                                      BlockingQueue<Runnable> workQueue,
                                      ThreadFactory threadFactory,
                                      RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("线程执行前执行");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("线程执行后执行");
    }

    @Override
    protected void terminated() {
        System.out.println("线程池被终止");
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutorProvider(1,
                1,
                0L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new AbortPolicy());

        executor.execute(() -> System.out.println("执行任务。。。"));
        executor.shutdown();
    }
}

线程池源码阅读

线程池代码中有一个属性:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));这个属性是AtomicInteger类型,因此是原子性的。而且初始值为:initialValue = ctlOf(RUNNING,0)。继续查看源码:RUNNINT = -1<<29,那么最终RUNNING的值为11100000000000000000000000000000,可以注意到前三位是1,后面都是0。

继续看ctlOf(RUNNING,0)方法内部:private static int ctlOf(int rs, int wc) { return rs | wc; },那么最终ctl属性的初始值就是:11100000000000000000000000000000,其中后29位表示的是当前线程池中的线程数,而前3位表示线程池的状态。这些可以从代码中看出:

		private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
		// 获取当前线程池状态
		private static int runStateOf(int c)     { return c & ~CAPACITY; }
		// 获取当前线程池中线程数量
		private static int workerCountOf(int c)  { return c & CAPACITY; }

到目前为止,我们可以得出线程池可以存放的最大线程数为00011111111111111111111111111111既2^29-1=536870911个线程

查看线程池的execute(Runnable command)方法的源码:

public void execute(Runnable command) {
  
  			// 要执行的任务不能是null
        if (command == null)
            throw new NullPointerException();
  			// 获取ctl的值
        int c = ctl.get();
  			// 获取当前线程数
        if (workerCountOf(c) < corePoolSize) {
          // 当前线程数小于核心线程数,直接添加新的线程,结束
            if (addWorker(command, true))
                return;
          // 再次获取ctl的值,避免在此期间线程的状态被改变,保证数据的准确性
            c = ctl.get();
        }
  			// 当线程走到这里,说明当前线程池核心线程数已经满了
  			// 线程池状态为运行中并且向等待队列插入任务成功
        if (isRunning(c) && workQueue.offer(command)) {
          	// 再次获取ctl值,保证获取最新值
            int recheck = ctl.get();
            // 当前线程池是关闭状态,那么从队列中移除这个任务并拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
          // 如果线程池不是关闭状态或者移除队列任务失败,那么获取当前线程数,如果是0就添加一个空的worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
  			// 插入等待队列失败,新建线程处理
        else if (!addWorker(command, false))
          	// 新建任务失败,说明达到最大线程数,调用拒绝策略
            reject(command);
    }

可以看出上面的流程分析与最开始画的线程池工作流程图是一样的,不过在细节上添加了很多判断当前线程池状态的操作。可以看出在代码中多次重新获取ctl值,用来获取当前线程池状态以及当前线程数。那么为什么要用一个值即表示线程池状态又表示线程数呢?

如果将这两个数拆出来拆成两个值,那么就需要不断的刷新两个数据的值,也就是说上面不断的获取ctl值需要获取两个值了,那么假设这种情况,我获取线程池状态,状态为运行中,然后我获取当前线程数,假设此时线程池关闭了,那么此时是不知道的,就会造成获取到的状态不是实时状态,而使用一个数操作,不仅操作方便,而且也避免了数据实时性的问题。(仅代表个人理解,如果有高见请指出)

然后我们在看上面代码,是通过addWorker(Runnable command,boolean core)方法来向线程池添加线程的。

  • 参数command:要执行的任务
  • 参数core: 是否是核心线程,因为核心线程是不会销毁的,而不是核心线程,当空闲时间超过存活时间会自动销毁。

查看addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
  			// 自旋,保证修改当前线程数一定能成功
        for (;;) {
          // 获取线程池状态,如果线程池已经被关闭,那么就不添加到线程池了
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
              // 如果超过线程数限制返回失败,是核心线程就比较核心线程数,不是核心线程就比较最大线程数
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
              // 线程数+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

  			// 走到这里才真正想线程池中添加线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          // 创建一个新的线程,线程池中使用Worker创建线程,并将当前任务作为线程的第一个任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      // 将线程添加到线程池
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面创建线程池中的线程使用的是Worker来创建线程。那么创建的Worker都存放在线程池,线程池的实现就是:private final HashSet<Worker> workers = new HashSet<Worker>();这个hashSet就是线程池。查看Worker的代码:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
          // 设置状态为-1表示此worker为新建状态,还未开启线程,不支持中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

可以看出Worker也继承自AQS,我们学习AQS时说AQS的核心就是state。在线程池中这个state就代表当前线程是否被使用。而且Worker也实现了Runnable接口,因此它自身就是一个线程任务,他内部的属性Thread在创建时调用了线程工程创建一个新线程。因此说一个Worker就是代表了一个可执行线程。而且这个可执行线程的run方法是:

        public void run() {
            runWorker(this);
        }

查看runWorker方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
  // 获取当前线程的第一个任务并将firstTask属性设置为null(也就是取出了worker的第一个任务)
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          // 如果worker的第一个任务不等于空 或者 从等待队列取到的任务不是空
            while (task != null || (task = getTask()) != null) {
              // 执行线程的run方法。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  //调用钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      //运行run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      // 调钩子函数 
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
          // 控制线程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法中我们学习到了线程池是如何复用线程的。当一个worker的线程被启动时,会调用runWorker方法,然后内部流程为:

  1. 取出worker的firstTask。
  2. firstTask是否为空,如果为空则跳至4。
  3. 执行firstTask的run方法。
  4. 从等待队列取出task,如果task不为空则跳至2,如果为空则往下。
  5. 控制线程退出。

流程图如下:

image-20210122094621712

如果程序运行过程中出现异常,或者等待队列为空时就要退出异常线程或空闲线程。退出空闲线程的流程为:

  1. 当前线程是否需要退出(异常结束线程直接退出)
  2. 从线程池移除当前线程
  3. 如果当前线程池的线程数小于核心线程数,那么添加一个任务为空的worker

ExecutorCompletionService

在我们执行有返回值的任务时,会返回一个Future类型,可以通过future.get()方法获取返回值,但是这个get()方法时阻塞的。因此就可能会出现其他任务已经完成结果,但是我们还在阻塞等待前一个任务的结果。这样算是间接的浪费了时间。查看如下代码:

package com.xiazhi.pool;

import java.util.concurrent.*;

/**
 * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
 * 冰箱: 发货 -> 收货 10s
 * 洗衣机: 发货-> 收货 15s
 * 电视: 发货 -> 收货 8s
 * 我们将电器从楼下搬到楼上的时间为3s
 *
 * @author 赵帅
 * @date 2021/1/22
 */
public class NormalExecutorService {

    /**
     * 为了方便使用Executors,实际工作不建议使用
     */
    final ExecutorService executorService = Executors.newFixedThreadPool(3);

    static void transport(String shop) {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("搬运" + shop);
    }

    /**
     * 运输冰箱
     */
    static String transportFridge() {

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("冰箱到了");
        return "冰箱";
    }

    /**
     * 运输电视
     */
    static String transportTV() {

        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("电视到了");
        return "TV";
    }

    /**
     * 运输洗衣机
     */
    static String transportWashing() {

        try {
            TimeUnit.SECONDS.sleep(15);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("洗衣机到了");
        return "洗衣机";
    }


    /**
     * 根据上面的题意,如果我们用正常的Future.get()方法去实现的话:
     */
    void way1() throws ExecutionException, InterruptedException {
        
        long start = System.currentTimeMillis();
        Future<String> fridge = executorService.submit(NormalExecutorService::transportFridge);
        Future<String> washing = executorService.submit(NormalExecutorService::transportWashing);
        Future<String> tv = executorService.submit(NormalExecutorService::transportTV);

        transport(fridge.get());
        transport(washing.get());
        transport(tv.get());
        System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
        executorService.shutdown();
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NormalExecutorService service = new NormalExecutorService();
        service.way1();
    }
}

根据执行结果可以看出,电视先到了,但是并没有先把电视机搬上楼,而是等待冰箱到了之后才搬运冰箱,然后等待洗衣机到搬运洗衣机,最后才搬运电视。总共耗时21秒。

或许我们会想调换get的顺序,先搬运电视不就好了。现在是我们知道每个任务的时间,我们或许可以调整,但是在实际工作中,大多数并行情况我们并不能确定执行的时间(网络延迟等影响),因此我们也就无法确定我们应该先等待哪儿个到达。

正确的处理思路是先到哪儿个先搬哪儿个。ExecutorCompletionService就是这样的一个作用。我们使用ExecutorCompletionService实现上面的过程:

package com.xiazhi.pool;

import java.util.concurrent.*;

/**
 * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
 * 冰箱: 发货 -> 收货 10s
 * 洗衣机: 发货-> 收货 15s
 * 电视: 发货 -> 收货 8s
 * 我们将电器从楼下搬到楼上的时间为3s
 *
 * @author 赵帅
 * @date 2021/1/22
 */
public class ExecutorCompolationServiceDemo {
    /**
     * 为了方便使用Executors,实际工作不建议使用
     */
    final ExecutorService executorService = Executors.newFixedThreadPool(3);

    static void transport(String shop) {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("搬运" + shop);
    }

    /**
     * 运输冰箱
     */
    static String transportFridge() {

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("冰箱到了");
        return "冰箱";
    }

    /**
     * 运输电视
     */
    static String transportTV() {

        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("电视到了");
        return "TV";
    }

    /**
     * 运输洗衣机
     */
    static String transportWashing() {

        try {
            TimeUnit.SECONDS.sleep(15);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("洗衣机到了");
        return "洗衣机";
    }


    /**
     * 使用executorCompletionService实现
     */
    void way2() throws ExecutionException, InterruptedException {
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

        long start = System.currentTimeMillis();
        completionService.submit(NormalExecutorService::transportFridge);
        completionService.submit(NormalExecutorService::transportWashing);
        completionService.submit(NormalExecutorService::transportTV);

        for (int i = 0; i < 3; i++) {
            transport(completionService.take().get());
        }

        System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
        executorService.shutdown();
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorCompolationServiceDemo service = new ExecutorCompolationServiceDemo();
        service.way2();
    }
}

可以看到执行时间在18秒,提高了3秒。

ForkJoinPool

forkjoin是指任务的拆分和合并。forkJoinPool会将一个任务拆分成多个任务队列,也就是fork,每一个任务队列是一个线程,当一个任务队列中的任务执行完之后,会从其他任务队列中偷取任务执行,然后执行结果再进行合并也就是join。java8新特性中的并行流就是通过forkJoinPool实现的。

package com.xiazhi.pool;

import java.util.Arrays;

/**
 * @author 赵帅
 * @date 2021/1/22
 */
public class ParallelStreamDemo {
    static String[] array = new String[1_000_000];

    public static void main(String[] args) {
        for (int i = 0; i < array.length; i++) {
            array[i] = "hello" + i;
        }

        Arrays.stream(array).parallel().forEach(f->{
            System.out.println(Thread.currentThread().getName() + ":value:" + f);
        });
    }
}

观察结果可以看到启动了很多的线程

原文地址:https://www.cnblogs.com/Zs-book1/p/14312491.html