java线程池

线程池

线程池是什么?

线程池的概念是初始化线程池时在池中创建空闲的线程,一旦有工作任务,可直接使用线程池中的线程进行执行工作任务,任务执行完成后又返回线程池中成为空闲线程。使用线程池可以减少线程的创建和销毁,提高性能。

    举个例子:我是一个包工头,代表线程池,手底下有若干工人代表线程池中的线程。如果我没接到项目,那么工人就相当于线程池中的空闲线程,一但我接到了项目,我可以立刻让我手下的工人去工作,每个工人同一时间执行只执行一个工作任务,执行完了就去执行另一个工作任务,直到没有工作任务了,这时工人就可以休息了。

队列是什么?

队列作为一个缓冲的工具,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务

举个例子,我又是一个包工头,一开始我只接了一个小项目,所以只有三个工作任务,但我手底下有四个工人,那么其中三人各领一个工作任务去执行就好了,剩下一个人就先休息。但突然我又接到了几个大项目,那么有现在有很多工作任务了,但手底下的工人不够啊。

那么我有两个选择:

(1)雇佣更多的工人

(2)把工作任务记录下来,按先来后到的顺序执行

但雇佣更多等工人需要成本啊,对应到计算机就是资源的不足,所以我只能把工作任务先记录下来,这样就成了一个队列了。

为什么要使用线程池?

假设我又是一个包工头,我现在手底下没有工人了,但我接到了一个项目,有了工作任务要执行,那我肯定要去找工人了,但招人成本是很高的,工作完成后还要给遣散费,这样算起来好像不值,所以我事先雇佣了固定的几个工人作为我的长期员工,有工作任务就干活,没有就休息,如果工作任务实在太多,那我也可以再临时雇佣几个工人。一来二去工作效率高了,付出的成本也低了。Java自带的线程池的原理也是如此。

Java自带的线程池

Executor接口是Executor的父接口,基于生产者--消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者,如果要在程序中实现一个生产者--消费者的设计,那么最简单的方式通常是使用Executor。

Executor框架

Executor框架是java中的线程实现。Executor是最顶层的接口定义,它的子类和实现主要包括ExecutorService、ScheduledExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool等。其结构如下图所示:

Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后我们拿到一个Future,它代表一个异步任务执行的结果。

ThreadPoolExecutor:是线程池中最核心的类,这个类的各个构造参数

  1. corePoolSize:线程池的核心线程数目,当一个请求进来时如果当前线程池中线程数量小于这个值,则直接通过ThreadFactory新建一个线程来处理这个请求,如果已经有线程数量大于等于这个值则将请求放入阻塞队列中。
  2. maximumPoolSize:线程池的最大线程数目,当线程池数量已经等于corePoolSize并且阻塞队列已经满了,则看线程数量是否小于maximumPoolSize:如果小于则创建一个线程来处理请求,否则使用"饱和策略"来拒绝这个请求。对于大于corePoolSize部分的线程称这部分线程为"idle threads",这部分线程会有一个最大空闲时间,如果超过这个空闲时间还没有人任务进来则将这些空闲线程回收。
  3. keepAliveTime和unit:这两个参数主要用来控制idle threads的最大空闲时间,超过这个空闲时间空闲线程将被回收。这里有一点需要注意,ThreadPoolExecutor中有一个属性:private volatile Boolean allowCoreThreadTimeOut;这个用来指定是否允许核心线程空闲超时回收,默认为false,即不允许核心线程超时回收,核心线程将一直等待新任务。如果设置这个参数为true,核心线程空闲超时后也可以被回收。
  4. workQueue:阻塞队列,超过corePoolSize部分的请求放入这个阻塞队列中等待执行。阻塞对队列分为有界阻塞队列和无界阻塞队列。在创建阻塞队列时如果我们指定了这个队列的"capacity"则这个队列就是有界的,否则无界。
  5. threadFactory:是一个线程池工厂,主要用来为线程池创建线程,我们可以定制一个ThreadFactory来达到统一命名我们线程池中的线程的目的。
  6. handler:饱和策略,用来拒绝多余的请求。饱和策略有:CallerRunsPolicy:请求脱离线程池运行(调用者caller线程来运行这个任务);AbortPolicy:抛出RejectedExecutionException异常;DiscardPolicy:丢弃这个任务,即什么也不做;DiscardOldestPolicy:将阻塞队列中等待时间最久的任务删除(即队列头部的任务),将新的任务加入队尾。

线程池的工作流程图

1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

Executor框架实现线程池的原理

线程池内部状态

其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011;
任务提交

线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。

Executor.excute()

通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

ExecutorService.submit()

通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

任务执行

当向线程池中提交一个任务,线程池会如何处理该任务?

execute实现

具体的执行流程如下:

1.workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤(2);
2.如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4);
3.再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
4.执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务;

addWorker实现

从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码实现如下:

这只是addWorker方法实现的前半部分:

1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程,具体实现如下:

线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程,其中Worker类设计如下:
1、继承了AQS类,可以方便的实现工作线程的中止操作;
2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
3、当前提交的任务firstTask作为参数传入Worker的构造方法;

从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。

runWorker实现

runWorker方法是线程池的核心:
1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
getTask实现

整个getTask操作在自旋下完成:
1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

所以,线程池中实现的线程可以一直执行由用户提交的任务。

自己实现线程池

根据Executor框架实现线程池的原理,我们可以自己动手实现

实现的思路如下:

 

 

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class ThreadPoolExecutor {
    
    /*
     * BlockingQueue是阻塞队列,在两种情况下出现阻塞:
     *     1、当队列满了,入队列操作时;
     *     2、当队列空了,出队列操作时。
     * 阻塞队列是线程安全的,主要使用在生产/消费者的场景
     */
    private BlockingQueue<Task> blockingQueue;
    
    //线程池的工作线程数(可以认为是线程池的容量)
    private int poolSize = 0;
    
    //线程池的核心容量(也就是当前线程池中真正存在的线程个数)
    private int coreSize = 0;
    
    /*
     * 此地方使用volatile关键字,volatile的工作原理是:对于JVM维度来说,每个线程持有变量的工作副本,那对于计算机维度来说,
     * 就是这些变量的中间值会存放在高速缓存中。通过volatile关键字,告知每个线程改变此变量之后,立马更新到内存中去,并且使得
     * 缓存中的数据失效,这样来保证其中某个线程改变公有变量后,其他线程能及时读取到最新的变量值,从而保证可见性。
     * 原因如下:
     *     1、在ThreadPoolExecutorTest中操作shutDown,这是main线程操作此变量(由于变量是volatile声明,所以会立马写入内存中);
     *     2、Worker中线程通过while(!shutDown)来判断当前线程是否应该关闭,因此需通过volatile保证可见性,使线程可以及时得到关闭。
     */
    private volatile boolean shutDown = false;
    
    public ThreadPoolExecutor(int size) {
        this.poolSize = size;
        //LinkedBlockingQueue的大小可以指定,不指定即为无边界的。
        blockingQueue = new LinkedBlockingQueue<>(poolSize);
    }
    
    public void execute(Task task) throws InterruptedException {
        if(shutDown == true) {
            return;
        }
        
        if(coreSize < poolSize) {
            /*
             * BlockingQueue中的插入主要有offer(obj)以及put(obj)两个方法,其中put(obj)是阻塞方法,如果插入不能马上进行,
             * 则操作阻塞;offer(obj)则是插入不能马上进行,返回true或false。
             * 本例中的Task不允许丢失,所以采用put(obj);
             */
            blockingQueue.put(task);
            produceWorker(task);
        }else {
            blockingQueue.put(task);
        }
    }

    private void produceWorker(Task task) throws InterruptedException {
        if(task == null) {
            throw new NullPointerException("非法参数:传入的task对象为空!");
        }

        Thread thread = new Thread(new Worker());        
        thread.start();
        coreSize++;
    }
    
    /*
     * 真正中断线程的方法,是使用共享变量发出信号,告诉线程停止运行。
     * 
     */
    public void shutDown() {
        shutDown = true;
    }
    
    /*
     * 此内部类是实际上的工作线程
     * 
     */
    class Worker implements Runnable {

        @Override
        public void run() {        
            while(!shutDown) {    
                try {
                    //
                    blockingQueue.take().doJob();
                } catch (InterruptedException e) {                    
                    e.printStackTrace();
                }
            }            
            System.out.println("线程:" + Thread.currentThread().getName() + "退出运行!");
        }    
    }
}
public class Task {

    //通过taskId对任务进行标识
    private int taskId;
    
    public Task(int taskId) {
        this.taskId = taskId;
    }

    public void doJob() {
        System.out.println("线程" + Thread.currentThread().getName() + "正在处理任务!");
    }

    public int getId() {        
        return taskId;
    }
}
package com.ty.thread;

/**
 */
public class ThreadPoolExecutorTest {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3);
        for(int i = 0; i < 10; i++) {
            Task task = new Task(i);
            threadPoolExecutor.execute(task);                
        }        
        
        threadPoolExecutor.shutDown();
    }
}

运行结果

线程Thread-0正在处理任务!

线程Thread-1正在处理任务!

线程Thread-0正在处理任务!

线程Thread-1正在处理任务!

线程Thread-2正在处理任务!

线程Thread-0正在处理任务!

线程Thread-1正在处理任务!

线程:Thread-1退出运行!

线程:Thread-0退出运行!

线程Thread-2正在处理任务!

线程:Thread-2退出运行!

原文地址:https://www.cnblogs.com/kexinxin/p/11569981.html