自己实现一个简单的线程池

 

一. JSR 166及J.U.C

1.什么是JSR:

  • JSR,全称 Java Specification Requests, 即Java规范提案, 主要是用于向JCP(Java Community Process)提出新增标准化技术规范的正式请求。每次JAVA版本更新都会有对应的JSR更新,比如在Java 8版本中,其新特性Lambda表达式对应的是JSR 335新的日期和时间API对应的是JSR 310
  • 简单说: 就是JSR就是java开发者以及授权者指定的标准,

2.什么是JCP

  • JCP,即Java Community Process ,Java社区进程 成立于1998年,由社会各界Java组成的社区,规划和领导Java的发展。
  • Jcp官方网站是:https://www.jcp.org/en/home/index

3. 什么是openJDK

  • Sun公司初始设立的开发Java源码组织,也是开源JDK的名字

4.什么是JSR 166

本文的关注点仅仅是JSR 166,它是一个关于Java并发编程的规范提案,在JDK中,该规范由java.util.concurrent包实现,是在JDK 5.0的时候被引入的;

另外JDK6引入DequesNavigable collections,对应的是JSR 166xJDK7引入fork-join框架,用于并行执行任务,对应的是JSR 166y

5.什么是J.U.C

java.util.concurrent的缩写,该包参考自EDU.oswego.cs.dl.util.concurrent,是JSR 166标准规范的一个实现;

JSR 166以及J.U.C包的作者就是Doug Lea
在这里插入图片描述

二.本文目的

自己动手写一个的线程池,同时会了解到线程池的工作原理,以及如何在工作中合理的利用线程池。

三.对线程池的认识

1.线程池是什么

线程池的概念是初始化线程池时在池中创建空闲的线程,一但有工作任务,可直接使用线程池中的线程进行执行工作任务,任务执行完成后又返回线程池中成为空闲线程。使用线程池可以减少线程的创建和销毁,提高性能。

举个例子:

  • 我是一个包工头,代表线程池,手底下有若干工人代表线程池中的线程。如果我没接到项目,那么工人就相当于线程池中的空闲线程,一但我接到了项目,我可以立刻让我手下的工人去工作,每个工人同一时间执行只执行一个工作任务,执行完了就去执行另一个工作任务,知道没有工作任务了,这时工人就可以休息了(原谅我让工人无休止的工作),也就是又变成了线程池中的空闲线程池。

2.队列是什么

队列作为一个缓冲的工具,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务

举个例子:

  • 我又是一个包工头,一开始我只接了一个小项目,所以只有三个工作任务,但我手底下有四个工人,那么其中三人各领一个工作任务去执行就好了,剩下一个人就先休息。但突然我又接到了几个大项目,那么有现在有很多工作任务了,但手底下的工人不够啊。

那么我有两个选择:

  1. 雇佣更多的工人
  2. 把工作任务记录下来,按先来后到的顺序执行

但雇佣更多等工人需要成本啊,对应到计算机就是资源的不足,所以我只能把工作任务先记录下来,这样就成了一个队列了。

3.为什么要使用线程池

假设我又是一个包工头,我现在手底下没有工人了,但我接到了一个项目,有了工作任务要执行,那我肯定要去找工人了,但招人成本是很高的,工作完成后还要给遣散费,这样算起来好像不值,所以我事先雇佣了固定的几个工人作为我的长期员工,有工作任务就干活,没有就休息,如果工作任务实在太多,那我也可以再临时雇佣几个工人。一来二去工作效率高了,付出的成本也低了。Java自带的线程池的原理也是如此。

4.个人认识

线程池它就是一个调度任务的工具。他会初始化线程池会设置线程池的大小,假设我们有500个任务需要运行,而线程池的大小为10-20,在真正运行任务的过程中他肯定不会创建这500个线程同时运行,而是充分利用线程池里这10-20个线程来调度这500个任务。而这里的10-20个线程最后会由线程池封装为ThreadPoolExecutor.Worker对象,而这个Worker是实现了Runnable接口的,所以他自己本身就是一个线程。:

execute()方法是如何处理的

在这里插入图片描述

  1. 获取当前线程池的状态
  2. 当前线程数量小于coresize时创建一个新的线程运行。
  3. 如果当前线程处于运行状态并且写入阻塞队列成功。
  4. 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
  5. 如果当前线程池为空就新创建一个线程并执行。
  6. 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略

四.自己编写一个线程池

1.创建线程池

创建了一个CustomThreadPool 类,它的工作原理如下:
在这里插入图片描述
简单来说就是往线程池里边丢任务,丢的任务会缓冲到阻塞队列里;线程池里存储的其实就是一个个的 Thread ,他们会一直不停的从刚才缓冲的队列里获取任务执行。

1.创建一个任务线程用于执行具体业务逻辑
在这里插入图片描述
2.创建线程池CustomThreadPool
在这里插入图片描述
执行效果
在这里插入图片描述

  1. 初始化了一个核心为3、最大线程数为5、队列大小为 5 的线程池。
  2. 先往其中丢了 10 个任务,由于阻塞队列的大小为 5 ,最大线程数为 5 ,所以由于队列里缓冲不了最终会创建 5 个线程(上限)。
  3. 过段时间没有任务提交后( sleep)则会自动缩容到3个线程(保证不会小于核心线程数)。

2.构造函数

    /**
     * @param minSize       最小线程数(核心线程数)
     * @param maxSize       最大线程数(最大线程数-核心线程数 = 非核心线程数)
     * @param keepAliveTime 线程空闲时间
     * @param unit          线程空闲时间单位
     * @param workQueue     阻塞队列
     * @param notify        通知接口
     */
    public CustomThreadPool(int minSize,
                            int maxSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            Notify notify) {
        this.minSize = minSize;
        this.maxSize = maxSize;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
        this.workQueue = workQueue;
        this.notify = notify;
        workers = new ConcurrentHashSet<>();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • minSize 最小线程数(核心线程数)
  • maxSize 最大线程数(最大线程数-核心线程数 = 非核心线程数)
  • keepAliveTime 线程空闲时间
  • workQueue 阻塞队列
  • notify 通知接口

CustomThreadPool参数和 ThreadPoolExecutor 中作用相同的,需要注意的是其中初始化了一个 workers 成员变量

3.成员变量workers

/**
* 存放线程池
*/
private volatile Set<Worker> workers;
  • 1
  • 2
  • 3
  • 4

workers是最终存放线程池中运行的线程,在j.u.c 源码中是一个 HashSet所以对他所有的操作都是需要加锁

为了方便起见就自己在CustomThreadPool内部定义了一个线程安全的 Set称为 ConcurrentHashSet。

    /**
     * 内部存放工作线程容器,并发安全
     *
     * @param <T>
     */
    private final class ConcurrentHashSet<T> extends AbstractSet<T> {
        private final Object PRESENT = new Object();
        private ConcurrentHashMap<T, Object> map = new ConcurrentHashMap<>();
        private AtomicInteger count = new AtomicInteger();

        @Override
        public Iterator<T> iterator() {
            return map.keySet().iterator();
        }

        @Override
        public int size() {
            return count.get();
        }

        @Override
        public boolean remove(Object o) {
            //将当前值原子递减1。
            count.decrementAndGet();
            //删除对象为PRESENT
            return map.remove(o) == PRESENT;
        }

        @Override
        public boolean add(T t) {
            //将当前值原子递增1。
            count.getAndIncrement();
            //新增对象为PRESENT
            return map.put(t, PRESENT) == PRESENT;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

原理很简单,和HashSet类似也是借助于HashMap来存放数据,利用其 key 不可重复的特性来实现 set ,只是这里的 HashMap 是用并发安全的 ConcurrentHashMap来实现的。能保证它的写入、删除都是线程安全的

  • 由于 ConcurrentHashMap 的 size()函数并不准确,这里单独利用了一个 AtomicInteger 来统计容器大小

4.创建核心线程

往线程池中提交一个任务的时需要做很多事情,最重要的事情就是创建线程存到线程池中。且不能无限制的创建线程,不然使用线程池就没有意义了。于是miniSize maxSize这两个参数就可以派上用场。

    /**
     * 最小线程数,也叫核心线程数
     * <p>
     * minSize 会在多线程场景下使用,所以也用 volatile 关键字来保证可见性。
     */
    private volatile int minSize;
    /**
     * 最大线程数=(核心线程数+非核心线程数)
     * maxSize 会在多线程场景下使用,所以也用 volatile 关键字来保证可见性。
     */
    private volatile int maxSize;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

miniSize, maxSize由于会在多线程场景下使用,所以也用volatile关键字来保证可见性

在这里插入图片描述

 /**
     * 执行任务-无返回值
     *
     * @param runnable 需要执行的任务
     */
    public void execute(Runnable runnable) {
        if (runnable == null) {
            throw new NullPointerException("runnable NullPointerException");
        }

        //线程池已经关闭,不能再提交任务了
        if (isShutDown.get()) {
            log.info("线程池已经关闭,不能再提交任务!");
            return;
        }

        //提交任务时计数器原子递增1
        totalTask.incrementAndGet();

        //线程池线程数小于核心线程数,则新建任务(第一步是需要判断是否大于核心线程数,如果没有则创建。)
        if (workers.size() < minSize) {
            addWorker(runnable);
            return;
        }

        //线程池线程数>核心线程数,则将任务插入任务队列中进行排队(第二步要判断队列是否可以存放任务(是否已满,写入失败说明阻塞队列已满)。)//TODO 递增线程池的任务总数
        boolean offer = workQueue.offer(runnable);

        //插入队列失败
        if (!offer) {
            //线程池线程数 < 最大线程数,则新建任务执行
            if (workers.size() < maxSize) {
                //新建任务执行
                addWorker(runnable);
                return;
            } else {
                //-----------------官方为执行拒绝策略--------------
                log.info("超过最大线程数=>" + runnable);
                try {
                    //会阻塞
                    workQueue.put(runnable);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
    /**
     * 添加任务并执行,需要加锁
     *
     * @param runnable
     */
    private void addWorker(Runnable runnable) {
        //新建任务
        Worker worker = new Worker(runnable, true);
        //启动任务
        worker.startTask();
        //将新增任务保存到线程池
        workers.add(worker);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
worker.startTask() 为调用线程的start()启动线程
  • 1

结合代码可以确定,第一步是需要判断是否线程池总数大于核心线程数,如果没有则创建。
在这里插入图片描述

5.阻塞队列

在这里插入图片描述

结合上面的流程图,第二步要判断队列是否可以存放任务(是否已满,写入失败说明阻塞队列已满)
在这里插入图片描述
优先会往队列里存放。
在这里插入图片描述

6.队列已满

在这里插入图片描述

  • 一旦写入失败则会判断当前线程池的大小是否大于最大线程数,如果没有则继续创建线程执行。不然则执行会尝试阻塞写入队列( j.u.c 会在这里执行拒绝策略

以上的步骤和刚才那张流程图是一样的,这样大家是否有看出什么坑嘛?

7.时刻小心

在这里插入图片描述
从上面流程图的这两步可以看出会直接创建新的线程。

这个过程相对于中间直接写入阻塞队列的开销是非常大的,主要有以下两个原因:

  1. 创建线程会加锁,虽说最终用的是ConcurrentHashMap的写入函数,但依然存在加锁的可能。
  2. 会创建新的线程,创建线程还需要调用操作系统的 API 开销较大。

所以理想情况下我们应该避免这两步,尽量让丢入线程池中的任务进入阻塞队列中。

8.执行任务

任务是添加进来了,那是如何执行的?

在创建任务的时候提到过worker.startTask()函数:

在这里插入图片描述
也就是在创建线程执行任务的时候会**创建 Worker 对象**,利用它的 startTask()方法来执行任务。

8.1.Worker对象

Worker对象本身也是一个线程,将接收需要执行的任务保存到成员变量 task处。

    /**
     * 工作线程--Worker本身也是一个线程,将接收到需要执行的任务存放到成员变量 task 处。
     */
    private final class Worker extends Thread {
        /**
         * Worker本身也是一个线程,将接收到需要执行的任务存放到成员变量 task 处。
         */
        private Runnable task;
        /**
         * 当前线程 == this
         */
        private Thread thread;
        /**
         * true 创建新的线程执行
         * false 从队列里获取线程执行
         */
        private boolean isNewTask;

        /**
         * @param task      接收到需要执行的任务
         * @param isNewTask 是创建新线程还是从队列取
         */
        public Worker(Runnable task, boolean isNewTask) {
            this.task = task;
            this.isNewTask = isNewTask;
            this.thread = this;
        }

        /**
         * 创建线程执行任务的时候会创建 Worker 对象,利用 startTask() 方法来执行任务。
         */
        public void startTask() {
            thread.start();
        }

        /**
         *关闭线程
         */
        public void close() {
            thread.interrupt();
        }

        /**
         * 线程体-Worker真正执行的地方
         */
        @Override
        public void run() {
            Runnable task = null;

            if (isNewTask) {
                task = this.task;
            }

            //任务是否完成
            boolean compile = true;

            try {
                //1.不停的从队列里获取任务执行,直到获取不到新任务了。
                //getTask()=>一旦线程池大小超过了核心线程数就会使用保活时间来从队列里获取任务,所以一旦获取不到返回 null 时就会在finally触发回收
                //getTask()=>但如果我们的队列足够大,导致线程数都不会超过核心线程数,这样是不会触发回收的。
                while (task != null || (task = getTask()) != null) {
                    try {
                        //执行任务(2.将创建线程时传过来的task执行)
                        task.run();
                    } catch (Exception e) {
                        compile = false;
                        throw e;
                    } finally {
                        //任务执行完毕,置空重新循坏获取阻塞队列中的任务
                        task = null;
                        //任务执行完成将线程池的任务总数计数器原子递减1 (3.任务执行完毕后将内置的计数器 -1 ,方便后面任务全部执行完毕进行通知。) //TODO 递减线程池的任务总数
                        int number = totalTask.decrementAndGet();
                        //一旦为 0 时则任务任务全部执行完毕;这时便可回调我们自定义的接口完成通知。
                        if (number == 0) {
                            synchronized (shutDownNotify) {
                                //唤醒线程池任务全部执行完毕后的通知组件
                                shutDownNotify.notify();
                            }
                        }
                    }

                }
            } finally {
                //释放线程(4.worker 线程获取不到任务后退出,需要将自己从线程池中释放掉( workers.remove(this))。)
                //      如果提交任务后我们没有关闭线程,会发现即便是任务执行完毕后程序也不会退出。
                boolean remove = workers.remove(this);

                //执行异常,回收当前异常的线程,然后创建一个新的Worker继续从队列里获取任务然后执行
                if (!compile) {
                    /**
                     * 其实在线程池内部会对线程的运行捕获异常,但它并不会处理,只是用于标记是否执行成功;类似于compile
                     * 一旦执行失败则会回收掉当前异常的线程,然后重新创建一个新的 Worker 线程继续从队列里取任务然后执行。
                     * 所以最终才会卡在 从队列中取任务处。
                     */
                    addWorker(null);
                }

                //关闭标识=true && 任务提交总数=0,关闭所有任务
                //而关闭线程通常又有以下两种:
                //  1.立即关闭:执行关闭方法后不管现在线程池的运行状况,直接一刀切全部停掉,这样会导致任务丢失。 shutDown() false
                //  2.不接受新的任务,同时等待现有任务执行完毕后退出线程池。shutDownNow() true
                tryClose(true);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

而其中最为关键的则是执行任务 worker.startTask()
在这里插入图片描述
而执行worker.startTask()其实就是运行了 worker 线程自己,下面来看 run 方法。

        /**
         * 线程体-Worker真正执行的地方
         */
        @Override
        public void run() {
            Runnable task = null;

            if (isNewTask) {
                task = this.task;
            }

            //任务是否完成
            boolean compile = true;
            try {
                //1.不停的从队列里获取任务执行,直到获取不到新任务了。
                //getTask()=>一旦线程池大小超过了核心线程数就会使用保活时间来从队列里获取任务,所以一旦获取不到返回 null 时就会在finally触发回收
                //getTask()=>但如果我们的队列足够大,导致线程数都不会超过核心线程数,这样是不会触发回收的。
                while (task != null || (task = getTask()) != null) {
                    try {
                        //执行任务(2.将创建线程时传过来的task执行)
                        task.run();
                    } catch (Exception e) {
                        compile = false;
                        throw e;
                    } finally {
                        //任务执行完毕,置空重新循坏获取阻塞队列中的任务
                        task = null;
                        //任务执行完成将线程池的任务总数计数器原子递减1 (3.任务执行完毕后将内置的计数器 -1 ,方便后面任务全部执行完毕进行通知。) //TODO 递减线程池的任务总数
                        int number = totalTask.decrementAndGet();
                        //一旦为 0 时则任务任务全部执行完毕;这时便可回调我们自定义的接口完成通知。
                        if (number == 0) {
                            synchronized (shutDownNotify) {
                                //唤醒线程池任务全部执行完毕后的通知组件
                                shutDownNotify.notify();
                            }
                        }
                    }

                }
            } finally {
                //释放线程(4.worker 线程获取不到任务后退出,需要将自己从线程池中释放掉( workers.remove(this))。)
                //      如果提交任务后我们没有关闭线程,会发现即便是任务执行完毕后程序也不会退出。
                boolean remove = workers.remove(this);

                //执行异常,回收当前异常的线程,然后创建一个新的Worker继续从队列里获取任务然后执行
                if (!compile) {
                    /**
                     * 其实在线程池内部会对线程的运行捕获异常,但它并不会处理,只是用于标记是否执行成功;类似于compile
                     * 一旦执行失败则会回收掉当前异常的线程,然后重新创建一个新的 Worker 线程继续从队列里取任务然后执行。
                     * 所以最终才会卡在 从队列中取任务处。
                     */
                    addWorker(null);
                }

                //关闭标识=true && 任务提交总数=0,关闭所有任务
                //而关闭线程通常又有以下两种:
                //  1.立即关闭:执行关闭方法后不管现在线程池的运行状况,直接一刀切全部停掉,这样会导致任务丢失。 shutDown() false
                //  2.不接受新的任务,同时等待现有任务执行完毕后退出线程池。shutDownNow() true
                tryClose(true);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  1. 第一步是将创建线程时传过来的任务执行( task.run),接着会一直不停的从队列里获取任务执行,直到获取不到新任务了。
    在这里插入图片描述

  2. 任务执行完毕后将内置的计数器 -1 ,方便后面任务全部执行完毕进行通知。
    在这里插入图片描述
    3.worker 线程获取不到任务后退出,需要将自己从线程池中释放掉(workers.remove(this))。
    在这里插入图片描述

9.从队列里获取任务

getTask() 也是非常关键的一个方法,它封装了从队列中获取任务,同时对不需要保活的线程进行回收。

/**
     * 从队列中获取任务
     * 封装了从队列中获取任务,同时对不需要保活的线程进行回收。
     * <p>
     * 两个地方需要注意
     * 1.当线程数 > 核心线程数时,在获取任务的时候需要通过超时时间从队列里获取任务;一旦获取不到任务则队列肯定是空的,这样返回 null
     * 之后在上文的 run() 中就会退出这个线程;从而达到了回收线程的目的,也就是我们之前演示的效果
     */
    private Runnable getTask() {
        //关闭标识=true && 任务提交总数=0
        if (isShutDown.get() && totalTask.get() == 0) {
            //一旦执行了 shutdown/shutdownNow 方法都会将线程池的状态置为关闭状态,这样只要 worker 线程尝试从队列里获取任务时就会直接返回空,导致 worker 线程被回收。
            return null;
        }

        lock.lock();
        try {
            Runnable task = null;
            //任务线程池线程总数 > 核心线程数 (这里需要加锁,加锁的原因是这里肯定会出现并发情况,不加锁会导致 workers.size()>miniSize 条件多次执行,从而导致线程被全部回收完毕。)
            if (workers.size() > minSize) {
                //大于核心线程数时需要用保活时间获取任务
                task = workQueue.poll(keepAliveTime, unit);
            } else {
                //TODO:如果任务队列是空的,会一直阻塞在这里。等待队列变为非空状态
                task = workQueue.take();//(如果不关闭线程池,不退出的原因是 Worker 线程一定还会一直阻塞在 task=workQueue.take(); 处,即使线程缩容了也不会小于核心线程数。)
            }
            if (task != null) {
                return task;
            }
        } catch (InterruptedException e) {
            return null;
        } finally {
            lock.unlock();
        }

        return null;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

核心作用就是从队列里获取任务但有两个地方需要注意:

  1. 当线程数超过核心线程数时,在获取任务的时候需要通过保活时间从队列里获取任务;一旦获取不到任务则队列肯定是空的,这样返回 null 之后在上文的 run() 中就会退出这个线程;从而达到了回收线程的目的,也就是我们之前演示的效果
    在这里插入图片描述

  2. 这里需要加锁,加锁的原因是这里肯定会出现并发情况,不加锁会导致 workers.size()>miniSize 条件多次执行,从而导致线程被全部回收完毕。

10.关闭线程池

不显示关闭线程池演示
在这里插入图片描述

在这里插入图片描述

已上面这段测试代码为例 ,如果提交任务后我们没有手动关闭线程池,会发现即便是任务执行完毕后程序也不会退出。

从源码getTask()中可以看出,不退出的原因是Worker 线程一定还会一直阻塞在 task=workQueue.take(); 处,即便是线程缩容了也不会小于核心线程数。
在这里插入图片描述
通过线程堆栈也能看出:
在这里插入图片描述
关闭线程池通常分为两种方式:

  1. 立即关闭: 执行关闭方法后不管现在线程池的运行状况,直接一刀切全部停掉,这样会导致任务丢失。
  2. 所有任务执行完毕后关闭:不接受新的任务,同时等待现有任务执行完毕后退出线程池。(上图演示的就是这种方式)

10.1.立即关闭

    /**
     * 立即关闭线程池,会造成任务丢失
     * 执行关闭方法后不管现在线程池的运行状况,直接一刀切全部停掉,这样会导致任务丢失。
     */
    public void shutDownNow() {
        //是否关闭线程池标志置为true
        isShutDown.set(true);
        //false 立即关闭线程池 =>任务有丢失的可能
        tryClose(false);
    }

    /**
     * 关闭线程池
     * @param isTry true 尝试关闭 =>会等待所有任务执行完毕
     *              false 立即关闭线程池 =>任务有丢失的可能
     */
    private void tryClose(boolean isTry) {
        //要求立即关闭线程池
        if (!isTry) {
            //关闭所有任务
            closeAllTask();
        } else {
            // 关闭标识=true && 任务提交计数器总数=0 (所有任务都执行完毕之后才会去中断线程。)
            if (isShutDown.get() && totalTask.get() == 0) {
                //关闭所有任务
                closeAllTask();
            }
        }
    }

    /**
     * 关闭所有任务
     */
    private void closeAllTask() {
        for (Worker worker : workers) {
            worker.close();
        }
    }


   /**
    *关闭线程(位于 CustomThreadPool的内部类Worker中)
    */
   public void close() {
       thread.interrupt();
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

演示立即关闭线程池
在这里插入图片描述
在这里插入图片描述
可以看出后面提交的3个任务是没有被执行的。

10.2.所有任务执行完毕后关闭

    /**
     * 执行任务完毕后关闭线程池
     * 不接受新的任务,同时等待现有任务执行完毕后退出线程池。
     */
    public void shutdown() {
        //是否关闭线程池标志置为true
        isShutDown.set(true);
        //尝试关闭 =>会等待所有任务执行完毕
        tryClose(true);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述
这里多了一个判断,需要所有任务都执行完毕之后才会去中断线程。

在这里插入图片描述

同时在线程需要回收时都会尝试关闭线程:
在这里插入图片描述
所有任务执行关闭立即关闭线程池
在这里插入图片描述
在这里插入图片描述

11.回收线程

两点总结为2点

  1. 一旦执行了shutdown/shutdownNow 方法都会将线程池的状态置为关闭状态 false,这样只要worker 线程尝试从队列获取任务时就会直接返回空导致 worker 线程被回收
    在这里插入图片描述

  2. 一旦线程池大小超过了核心线程数就会使用超时时间来从队列获取任务,所以一旦获取不到返回 null时就会触发回收
    在这里插入图片描述
    如果队列足够大,导致线程数都不超过核心线程数,将不会触发回收的
    如: 将队列大小调为 10 ,这样任务就会累积在队列里,不会创建五个 worker 线程所以一直都是 Thread-0~1 这三个线程在反复调度任务。
    在这里插入图片描述
    在这里插入图片描述

总结

本次实现了线程池里大部分核心功能,只要看完并动手敲一遍一定会对线程池有不一样的理解。

结合目前的内容来总结下:

  1. 线程池、队列大小要设计的合理,尽量的让任务从队列中获取执行。

  2. 慎用shutdownNow() 方法关闭线程池,会导致任务丢失(除非业务允许)。

  3. 如果任务多线程执行时间短可以调大 keepalive 值,使得线程尽量不被回收从而可以复用线程。

同时下次会分享一些线程池的新特性,如:

  • 执行带有返回值的线程。
  • 异常处理怎么办?
  • 所有任务执行完怎么通知我?

本文所有源码: 点击这里

一份针对于新手的多线程实践
一份针对于新手的多线程实践–进阶篇
一个线程罢工的诡异事件
线程池不容错过的细节
JAVA并发编程J.U.C学习总结
ReentrantLock 实现原理
Synchronize 关键字原理

原文地址:https://www.cnblogs.com/shoshana-kong/p/14036151.html