Java 四种线程池的使用

 

https://juejin.im/post/59df0c1af265da432f301c8d

https://www.cnblogs.com/java-cxh/p/13431789.html

1,线程池的作用 
线程池作用就是限制系统中执行线程的数量。 
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果。 
少了浪费了系统资源,多了造成系统拥挤效率不高。 
用线程池控制线程数量,其他线程排 队等候。 
一个任务执行完毕,再从队列的中取最前面的任务开始执行。 
若队列中没有等待进程,线程池的这一资源处于等待。 
当一个新任务需要运行时,如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。 
2,为什么要用线程池? 
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。 
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。 
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。 
3,比较重要的几个类

     类 
    描述

    ExecutorService 
    真正的线程池接口。

    ScheduledExecutorService 
    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

    ThreadPoolExecutor 
    ExecutorService的默认实现。

    ScheduledThreadPoolExecutor 
    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

4,new Thread的弊端

  1.  
    public class TestNewThread {
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
    new Thread(new Runnable() {
  5.  
     
  6.  
    @Override
  7.  
    public void run() {
  8.  
    System.out.println("start");
  9.  
    }
  10.  
    }).start();
  11.  
    }
  12.  
    }

执行一个异步任务你还只是如下new Thread吗? 
那你就out太多了,new Thread的弊端如下: 
1.每次new Thread新建对象性能差。 
2.线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。 
3.缺乏更多功能,如定时执行、定期执行、线程中断。 
相比new Thread,Java提供的四种线程池的好处在于: 
1.重用存在的线程,减少对象创建、消亡的开销,性能佳。 
2.可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。 
3.提供定时执行、定期执行、单线程、并发数控制等功能。

四种线程池 
Java通过Executors提供四种线程池,分别为: 
1,newCachedThreadPoo 
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 
2,newFixedThreadPool 
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 
3,newScheduledThreadPool 
创建一个定长线程池,支持定时及周期性任务执行。 
4,newSingleThreadExecutor 
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

源码分析newCachedThreadPool 

这是一个可缓存线程池,可以灵活的回收空闲线程,无可回收线程时,新建线程 
public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
60L, TimeUnit.SECONDS, 
new SynchronousQueue()); 
}码可以看出底层调用的是ThreadPoolExecutor方法,传入一个同步的阻塞队列实现

  1.  
    ThreadPoolExecupublic ThreadPoolExecutor(int corePoolSize,
  2.  
    int maximumPoolSize,
  3.  
    long keepAliveTime,
  4.  
    TimeUnit unit,
  5.  
    BlockingQueue<Runnable> workQueue) {
  6.  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7.  
    Executors.defaultThreadFactory(), defaultHandler);
  8.  
    }

通过源码可以看出,我们可以传入线程池的核心线程数(最小线程数),最大线程数量,保持时间,时间单位,阻塞队列这些参数,最大线程数设置为jvm可用的cpu数量为最佳实践

newWorkStealingPool 
创建持有足够线程的线程池来并行,通过使用多个队列减少竞争,不传参数,则默认设定为cpu的数量 
源码:

  1.  
    public static ExecutorService newWorkStealingPool() {
  2.  
    return new ForkJoinPool
  3.  
    (Runtime.getRuntime().availableProcessors(),
  4.  
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  5.  
    null, true);
  6.  
    }

通过源码可以看出底层调用的是ForkJoinPool线程池

下面说一下ForkJoinPool

  1.  
    public ForkJoinPool(int parallelism,
  2.  
    ForkJoinWorkerThreadFactory factory,
  3.  
    UncaughtExceptionHandler handler,
  4.  
    boolean asyncMode) {
  5.  
    this(checkParallelism(parallelism),
  6.  
    checkFactory(factory),
  7.  
    handler,
  8.  
    asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  9.  
    "ForkJoinPool-" + nextPoolId() + "-worker-");
  10.  
    checkPermission();
  11.  
    }

使用一个无限队列来保存需要执行的任务,可以传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量,使用分治法来解决问题,使用fork()和join()来进行调用

newSingleThreadExecutor 
创建一个单线程化的线程池,保证所有任务按照指定的顺序执行(FIFO,LIFO,优先级),当要求进程限制时,可以进行使用

源码:

  1.  
    public static ExecutorService newSingleThreadExecutor() {
  2.  
    return new FinalizableDelegatedExecutorService
  3.  
    (new ThreadPoolExecutor(1, 1,
  4.  
    0L, TimeUnit.MILLISECONDS,
  5.  
    new LinkedBlockingQueue<Runnable>()));
  6.  
    }

newFixedThreadPool 
创建一个固定线程数量,可重用的线程池

源码:

  1.  
    public static ExecutorService newFixedThreadPool(int nThreads) {
  2.  
    return new ThreadPoolExecutor(nThreads, nThreads,
  3.  
    0L, TimeUnit.MILLISECONDS,
  4.  
    new LinkedBlockingQueue<Runnable>());
  5.  
    }

newScheduledThreadPool 
创建一个可定期或者延时执行任务的线程池

源码:

return new ScheduledThreadPoolExecutor(corePoolSize); 

通过源码可以看出底层调用的是一个ScheduledThreadPoolExecutor,然后传入线程数量

下面来介绍一下ScheduledThreadPoolExecutor

  1.  
    public ScheduledThreadPoolExecutor(int corePoolSize) {
  2.  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3.  
    new DelayedWorkQueue());
  4.  
    }

通过源码可以看出底层调用了ThreadPoolExecutor,维护了一个延迟队列,可以传入线程数量,传入延时的时间等参数,下面给出一个demo

 
  1.  
    public static void main(String[] args) {
  2.  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
  3.  
    for (int i = 0; i < 15; i = i + 5) {
  4.  
    pool.schedule(() -> System.out.println("我被执行了,当前时间" + new Date()), i, TimeUnit.SECONDS);
  5.  
    }
  6.  
    pool.shutdown();
  7.  
    }

执行结果

我被执行了,当前时间Fri Jan 12 11:20:41 CST 2018 
我被执行了,当前时间Fri Jan 12 11:20:46 CST 2018 
我被执行了,当前时间Fri Jan 12 11:20:51 CST 2018

有的小伙伴可能会用疑问,为什么使用schedule()而不使用submit()或者execute()呢,下面通过源码来分析

 
  1.  
    public void execute(Runnable command) {
  2.  
    schedule(command, 0, NANOSECONDS);
  3.  
    }
  4.  
    public Future<?> submit(Runnable task) {
  5.  
    return schedule(task, 0, NANOSECONDS);
  6.  
    }

通过源码可以发现这两个方法都是调用的schedule(),而且将延时时间设置为了0,所以想要实现延时操作,需要直接调用schedule()

下面我们再来分析一下submit()和execute()的以及shutdown()和shutdownNow()的区别

submit(),提交一个线程任务,可以接受回调函数的返回值吗,适用于需要处理返回着或者异常的业务场景 
execute(),执行一个任务,没有返回值 
shutdown(),表示不再接受新任务,但不会强行终止已经提交或者正在执行中的任务 
shutdownNow(),对于尚未执行的任务全部取消,正在执行的任务全部发出interrupt(),停止执行 
五种线程池的适应场景 
newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。 
newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。 
newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。 
newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。 
newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行

RejectedExecutionHandler 线程池四种拒绝任务策略

《Java线程池》:任务拒绝策略 
在没有分析线程池原理之前先来分析下为什么有任务拒绝的情况发生。

这里先假设一个前提:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现新任务的拒绝处理问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理。

RejectedExecutionHandler提供了四种方式来处理任务拒绝策略

1、直接丢弃(DiscardPolicy)

2、丢弃队列中最老的任务(DiscardOldestPolicy)。

3、抛异常(AbortPolicy)

4、将任务分给调用线程来执行(CallerRunsPolicy)。

这四种策略是独立无关的,是对任务拒绝处理的四中表现形式。最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将有调用者线程去执行。

示例 
1,newCachedThreadPool 
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

  1.  
    package io.ymq.thread.demo1;
  2.  
     
  3.  
    import java.util.concurrent.ExecutorService;
  4.  
    import java.util.concurrent.Executors;
  5.  
     
  6.  
    /**
  7.  
    * 描述: 创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
  8.  
    * 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
  9.  
    *
  10.  
    * @author yanpenglei
  11.  
    * @create 2017-10-12 11:13
  12.  
    **/
  13.  
    public class TestNewCachedThreadPool {
  14.  
    public static void main(String[] args) {
  15.  
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  16.  
    for (int i = 1; i <= 10; i++) {
  17.  
    final int index = i;
  18.  
    try {
  19.  
    Thread.sleep(index * 1000);
  20.  
    } catch (InterruptedException e) {
  21.  
    e.printStackTrace();
  22.  
    }
  23.  
     
  24.  
    cachedThreadPool.execute(new Runnable() {
  25.  
     
  26.  
    @Override
  27.  
    public void run() {
  28.  
    String threadName = Thread.currentThread().getName();
  29.  
    System.out.println("执行:" + index + ",线程名称:" + threadName);
  30.  
    }
  31.  
    });
  32.  
    }
  33.  
    }
  34.  
    }

响应: 
执行:1,线程名称:pool-1-thread-1 
执行:2,线程名称:pool-1-thread-1 
执行:3,线程名称:pool-1-thread-1 
执行:4,线程名称:pool-1-thread-1 
执行:5,线程名称:pool-1-thread-1 
执行:6,线程名称:pool-1-thread-1 
执行:7,线程名称:pool-1-thread-1 
执行:8,线程名称:pool-1-thread-1 
执行:9,线程名称:pool-1-thread-1 
执行:10,线程名称:pool-1-thread-1 
2,newFixedThreadPool 
描述:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。 
线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  1.  
    package io.ymq.thread.demo2;
  2.  
     
  3.  
    import java.util.concurrent.ExecutorService;
  4.  
    import java.util.concurrent.Executors;
  5.  
     
  6.  
    /**
  7.  
    * 描述:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
  8.  
    * 线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  9.  
    *
  10.  
    * @author yanpenglei
  11.  
    * @create 2017-10-12 11:30
  12.  
    **/
  13.  
    public class TestNewFixedThreadPool {
  14.  
     
  15.  
    public static void main(String[] args) {
  16.  
     
  17.  
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  18.  
     
  19.  
    for (int i = 1; i <= 10; i++) {
  20.  
    final int index = i;
  21.  
    fixedThreadPool.execute(new Runnable() {
  22.  
     
  23.  
    @Override
  24.  
    public void run() {
  25.  
    try {
  26.  
    String threadName = Thread.currentThread().getName();
  27.  
    System.out.println("执行:" + index + ",线程名称:" + threadName);
  28.  
    Thread.sleep(2000);
  29.  
    } catch (InterruptedException e) {
  30.  
     
  31.  
    e.printStackTrace();
  32.  
    }
  33.  
    }
  34.  
    });
  35.  
    }
  36.  
     
  37.  
    }
  38.  
    }

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称。 
响应: 
执行:2,线程名称:pool-1-thread-2 
执行:3,线程名称:pool-1-thread-3 
执行:1,线程名称:pool-1-thread-1

执行:4,线程名称:pool-1-thread-1 
执行:6,线程名称:pool-1-thread-2 
执行:5,线程名称:pool-1-thread-3

执行:7,线程名称:pool-1-thread-1 
执行:9,线程名称:pool-1-thread-3 
执行:8,线程名称:pool-1-thread-2

执行:10,线程名称:pool-1-thread-1

3,newScheduledThreadPool 
创建一个定长线程池,支持定时及周期性任务执行。延迟执行

  1.  
    package io.ymq.thread.demo3;
  2.  
     
  3.  
    import java.util.concurrent.Executors;
  4.  
    import java.util.concurrent.ScheduledExecutorService;
  5.  
    import java.util.concurrent.TimeUnit;
  6.  
     
  7.  
    /**
  8.  
    * 描述:创建一个定长线程池,支持定时及周期性任务执行。延迟执行
  9.  
    *
  10.  
    * @author yanpenglei
  11.  
    * @create 2017-10-12 11:53
  12.  
    **/
  13.  
    public class TestNewScheduledThreadPool {
  14.  
     
  15.  
    public static void main(String[] args) {
  16.  
     
  17.  
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  18.  
     
  19.  
    scheduledThreadPool.schedule(new Runnable() {
  20.  
     
  21.  
    @Override
  22.  
    public void run() {
  23.  
    System.out.println("表示延迟3秒执行。");
  24.  
    }
  25.  
    }, 3, TimeUnit.SECONDS);
  26.  
     
  27.  
     
  28.  
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
  29.  
     
  30.  
    @Override
  31.  
    public void run() {
  32.  
    System.out.println("表示延迟1秒后每3秒执行一次。");
  33.  
    }
  34.  
    }, 1, 3, TimeUnit.SECONDS);
  35.  
    }
  36.  
     
  37.  
    }

表示延迟1秒后每3秒执行一次。 
表示延迟3秒执行。 
表示延迟1秒后每3秒执行一次。 
表示延迟1秒后每3秒执行一次。 
表示延迟1秒后每3秒执行一次。 
表示延迟1秒后每3秒执行一次。 
4,newSingleThreadExecutor 
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  1.  
    package io.ymq.thread.demo4;
  2.  
     
  3.  
    import java.util.concurrent.ExecutorService;
  4.  
    import java.util.concurrent.Executors;
  5.  
     
  6.  
    /**
  7.  
    * 描述:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  8.  
    *
  9.  
    * @author yanpenglei
  10.  
    * @create 2017-10-12 12:05
  11.  
    **/
  12.  
    public class TestNewSingleThreadExecutor {
  13.  
     
  14.  
    public static void main(String[] args) {
  15.  
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  16.  
    for (int i = 1; i <= 10; i++) {
  17.  
    final int index = i;
  18.  
    singleThreadExecutor.execute(new Runnable() {
  19.  
     
  20.  
    @Override
  21.  
    public void run() {
  22.  
    try {
  23.  
    String threadName = Thread.currentThread().getName();
  24.  
    System.out.println("执行:" + index + ",线程名称:" + threadName);
  25.  
    Thread.sleep(2000);
  26.  
    } catch (InterruptedException e) {
  27.  
    e.printStackTrace();
  28.  
    }
  29.  
    }
  30.  
    });
  31.  
    }
  32.  
    }
  33.  
    }

结果依次输出,相当于顺序执行各个任务。 
响应: 
执行:1,线程名称:pool-1-thread-1 
执行:2,线程名称:pool-1-thread-1 
执行:3,线程名称:pool-1-thread-1 
执行:4,线程名称:pool-1-thread-1 
执行:5,线程名称:pool-1-thread-1 
执行:6,线程名称:pool-1-thread-1 
执行:7,线程名称:pool-1-thread-1 
执行:8,线程名称:pool-1-thread-1 
执行:9,线程名称:pool-1-thread-1 
执行:10,线程名称:pool-1-thread-1

 

Future和FutureTask的使用

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

但是Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了FutureTask。

 1.使用Callable+Future获取执行结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

public class Test {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();

        Task task = new Task();

        Future<Integer> result = executor.submit(task);

        executor.shutdown();

         

        try {

            Thread.sleep(1000);

        catch (InterruptedException e1) {

            e1.printStackTrace();

        }

         

        System.out.println("主线程在执行任务");

         

        try {

            System.out.println("task运行结果"+result.get());

        catch (InterruptedException e) {

            e.printStackTrace();

        catch (ExecutionException e) {

            e.printStackTrace();

        }

         

        System.out.println("所有任务执行完毕");

    }

}

class Task implements Callable<Integer>{

    @Override

    public Integer call() throws Exception {

        System.out.println("子线程在进行计算");

        Thread.sleep(3000);

        int sum = 0;

        for(int i=0;i<100;i++)

            sum += i;

        return sum;

    }

}

   执行结果:

  1.  
    子线程在进行计算
  2.  
    主线程在执行任务
  3.  
    task运行结果4950
  4.  
    所有任务执行完毕

  2.使用Callable+FutureTask获取执行结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

public class Test {

    public static void main(String[] args) {

        //第一种方式

        ExecutorService executor = Executors.newCachedThreadPool();

        Task task = new Task();

        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

        executor.submit(futureTask);

        executor.shutdown();

         

        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread

        /*Task task = new Task();

        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

        Thread thread = new Thread(futureTask);

        thread.start();*/

         

        try {

            Thread.sleep(1000);

        catch (InterruptedException e1) {

            e1.printStackTrace();

        }

         

        System.out.println("主线程在执行任务");

         

        try {

            System.out.println("task运行结果"+futureTask.get());

        catch (InterruptedException e) {

            e.printStackTrace();

        catch (ExecutionException e) {

            e.printStackTrace();

        }

         

        System.out.println("所有任务执行完毕");

    }

}

class Task implements Callable<Integer>{

    @Override

    public Integer call() throws Exception {

        System.out.println("子线程在进行计算");

        Thread.sleep(3000);

        int sum = 0;

        for(int i=0;i<100;i++)

            sum += i;

        return sum;

    }

}

原文地址:https://www.cnblogs.com/kelelipeng/p/14924375.html