Java线程池的理解与应用

什么是并发?

  当有多个线程在运行时,如果系统只有一个CPU,那么它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,在将时间段分配给各个线程执行,在一个时间段的线程代码运行时,其他线程处于挂起状态,这种方式我们称之为并发(Concurrent)。

使用线程池的目的?

  1.线程是稀缺资源,不能频繁的创建

  2.解耦作用,线程的创建与执行完全分开,方便维护

  3.一个任务结束后,断开与线程池的连接,可以给其他任务复用。

线程池的原理?

  核心的思想就是把宝贵的资源放到一个池子中去,每次使用都从池子中获取,用完之后又放回池子供他人使用。

使用线程池的优点?

  如果每次使用new Thread()来创建线程,不仅耗费性能,而且创建的线程缺乏管理,被称为野线程,而且还可以无限制的创建,导致相互竞争,过多占用系统资源导致系统崩溃。如果采用线程池的方法,不仅可以减少对象的创建,节省性能,而且还可以重用存在的线程,还可以有效控制最大并发线程数,提高系统资源使用率,同时也避免了过多资源竞争,避免堵塞。

Eexecutor框架的理解?

  Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

使用JDK框架配置线程的几种方法?

 1 /**
 2  * 创建任务对象,实现Runnable接口
 3  * @author Administrator
 4  *
 5  */
 6 public class TaskDemo implements Runnable {
 7 
 8     @Override
 9     public void run() {
10         try {
11             System.out.println(Thread.currentThread().getName()+" is running!");
12             TimeUnit.SECONDS.sleep(2);
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16     }
17 
18 }

  1.创建固定大小的线程池

 1 /**
 2  * 固定大小的线程池
 3  * @author Administrator
 4  *
 5  */
 6 public class FixPollDemo {
 7 
 8     public static void main(String[] args) {
 9         //创建固定大小的线程池
10         ExecutorService pool = Executors.newFixedThreadPool(5);
11         //创建10个线程任务给线程池
12         for (int i = 0; i < 10; i++) {
13             //创建线程任务
14             Runnable task = new TaskDemo();
15             //把任务交给线程池去执行
16             pool.execute(task);
17         }
18         //关闭线程池
19         pool.shutdown();
20     }
21 
22 }

 newFixedThreadPool的源码:

 1 public static ExecutorService newFixedThreadPool(int nThreads) {
 2         return new ThreadPoolExecutor(nThreads, nThreads,
 3                                       0L, TimeUnit.MILLISECONDS,
 4                                       new LinkedBlockingQueue());
 5     }
 6 
 7 public ThreadPoolExecutor(int corePoolSize,
 8                               int maximumPoolSize,
 9                               long keepAliveTime,
10                               TimeUnit unit,
11                               BlockingQueue workQueue) {
12         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
13              Executors.defaultThreadFactory(), defaultHandler);
14     }
15 
16 public ThreadPoolExecutor(int corePoolSize,
17                               int maximumPoolSize,
18                               long keepAliveTime,
19                               TimeUnit unit,
20                               BlockingQueue workQueue,
21                               ThreadFactory threadFactory,
22                               RejectedExecutionHandler handler) {
23         if (corePoolSize < 0 ||
24             maximumPoolSize <= 0 ||
25             maximumPoolSize < corePoolSize ||
26             keepAliveTime < 0)
27             throw new IllegalArgumentException();
28         if (workQueue == null || threadFactory == null || handler == null)
29             throw new NullPointerException();
30         this.corePoolSize = corePoolSize;
31         this.maximumPoolSize = maximumPoolSize;
32         this.workQueue = workQueue;
33         this.keepAliveTime = unit.toNanos(keepAliveTime);
34         this.threadFactory = threadFactory;
35         this.handler = handler;
36     }
 在任意时刻,newFixedThreadPool构造出来的线程池中最多只可能存活着nThreads个线程,如果所有的线程都在运行任务,那么这个时候提交的任务将会被添
加到任务队列中去等待执行。

  我们可以控制corePoolSize和maximumPoolSize来使得通过ThreadPoolExecutor构造出来的线程池具有一些不一样的特性,但是需要注意的是,当我们设置的maximumPoolSize大于corePoolSize的时候,如果当前线程池里面的线程数量已经达到了corePoolSize了,并且当前所以线程都处于运行任务的状态,那么在这个时候提交的任务会被添加到任务队列中去,只有在任务队列满了的时候,才会去创建新的线程,如果线程数量已经达到了maximumPoolSize了,那么到此就会拒绝提交的任务,这些流程可以参考上面展示出来的execute方法的实现。该类型的线程池使用的任务队列是LinkedBlockingQueue类型的阻塞队列。

 2.创建可变大小的线程池

 1 /**
 2  * 创建可变大小的线程池
 3  * @author Administrator
 4  *
 5  */
 6 public class CachePollDemo {
 7 
 8     public static void main(String[] args) {
 9         //创建可变大小的线程池
10         ExecutorService cachePool = Executors.newCachedThreadPool();
11         //创建10个线程任务给线程池
12         for (int i = 0; i < 10; i++) {
13             //创建线程任务
14             Runnable task = new TaskDemo();
15             //把任务交给线程池去执行
16             cachePool.execute(task);
17         }
18         //关闭线程池
19         cachePool.shutdown();
20     }
21 }

  newCachedThreadPool的源码:

 1 public static ExecutorService newCachedThreadPool() {
 2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 3                                       60L, TimeUnit.SECONDS,
 4                                       new SynchronousQueue());
 5     }
 6 
 7 public ThreadPoolExecutor(int corePoolSize,
 8                               int maximumPoolSize,
 9                               long keepAliveTime,
10                               TimeUnit unit,
11                               BlockingQueue workQueue) {
12         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
13              Executors.defaultThreadFactory(), defaultHandler);
14     }
15 
16 public ThreadPoolExecutor(int corePoolSize,
17                               int maximumPoolSize,
18                               long keepAliveTime,
19                               TimeUnit unit,
20                               BlockingQueue workQueue,
21                               ThreadFactory threadFactory,
22                               RejectedExecutionHandler handler) {
23         if (corePoolSize < 0 ||
24             maximumPoolSize <= 0 ||
25             maximumPoolSize < corePoolSize ||
26             keepAliveTime < 0)
27             throw new IllegalArgumentException();
28         if (workQueue == null || threadFactory == null || handler == null)
29             throw new NullPointerException();
30         this.corePoolSize = corePoolSize;
31         this.maximumPoolSize = maximumPoolSize;
32         this.workQueue = workQueue;
33         this.keepAliveTime = unit.toNanos(keepAliveTime);
34         this.threadFactory = threadFactory;
35         this.handler = handler;
36     }

  newCachedThreadPool适合于类似秒杀系统中,它可以按需创建线程。每个线程在空闲了一段时间之后会被回收,然后需要创建的时候再创建出来,在使用的时候应该使用合适的构造参数。

  该类型使用的任务队列是SynchronousQueue这种同步队列,这是一种特别的队列,每个线程都是有使命的,每个线程都会等待另外一个线程和自己交易,在交易完成之前都会阻塞住线程,他们之间有一种传递关系,数据是从一个线程直接传递到另外一个线程中去的,SynchronousQueue这种队列不存储实际的数据,而是存储着一些线程的信息,而SynchronousQueue管理着这些线程之间的交易,更为详细的细节参考后面的文章。

  上面提到,ScheduleThreadPoolExecutor是继承自ThreadPoolExecutor的,而且从类图中也可以看出来这种关系,所以其实ScheduleThreadPoolExecutor是对ThreadPoolExecutor的增强,它增加了schedule功能,使用与那些需要周期性调度执行,或者是延时执行的任务,在ScheduleThreadPoolExecutor中使用了一种阻塞队列称为延时阻塞队列,这种队列有能力持有一段时间数据,我们可以设定这种时间,时间没到的时候尝试获取数据的线程会被阻塞,直到设定的时间到了,线程才会被唤醒来消费数据。

  3.创建单线程线程池

 1 /**
 2  * 单线程线程池
 3  * @author Administrator
 4  *
 5  */
 6 public class SinglePollDemo {
 7 
 8     public static void main(String[] args) {
 9         //创建线程池
10         ExecutorService singlePool = Executors.newSingleThreadExecutor();
11         //创建10个线程任务给线程池
12         for (int i = 0; i < 10; i++) {
13             //创建线程任务
14             Runnable task = new TaskDemo();
15             //把任务交给线程池去执行
16             singlePool.execute(task);
17         }
18         //关闭线程池
19         singlePool.shutdown();
20     }
21 }

  4.创建可调度线程池

 1 /**
 2  * 可调度的线程池
 3  * @author Administrator
 4  *
 5  */
 6 public class SchedulePollDemo {
 7 
 8     public static void main(String[] args) {
 9         //创建可调度的线程池
10         ExecutorService schedulePool = Executors.newScheduledThreadPool(5);
11         //创建10个线程任务给线程池
12         for (int i = 0; i < 10; i++) {
13             //创建线程任务
14             Runnable task = new TaskDemo();
15             //把任务交给线程池去执行
16             schedulePool.execute(task);
17         }
18         //关闭线程池
19         schedulePool.shutdown();
20     }
21 
22 }

线程池框架Executors的核心实现方法:

  查看代码会发现,其实看这三种方式创建的源码就会发现,以上三种都是利用利用 ThreadPoolExecutor 类实现的。

 1 /**
 2      *
 3      * @param corePoolSize 为线程池的基本大小。
 4      * @param maximumPoolSize 为线程池最大线程大小。
 5      * @param keepAliveTime 和 unit 则是线程空闲后的存活时间。
 6      * @param workQueue 用于存放任务的阻塞队列。
 7      * handler 当队列和最大线程池都满了之后的饱和策略。
 8      */
 9     public ThreadPoolExecutor(int corePoolSize,
10                               int maximumPoolSize,
11                               long keepAliveTime,
12                               TimeUnit unit,
13                               BlockingQueue<Runnable> workQueue) {
14         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
15              Executors.defaultThreadFactory(), defaultHandler);
16     }

  我们使用threadPool.execute(new Job())来提交一个任务到线程池,所有核心逻辑在execute()方法中。

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         // 获取当前线程池的状态
 5         int c = ctl.get();
 6         // 当前线程数量小于 coreSize 时创建一个新的线程运行
 7         if (workerCountOf(c) < corePoolSize) {
 8             if (addWorker(command, true))
 9                 return;
10             c = ctl.get();
11         }
12         // 如果当前线程处于运行状态,并且写入阻塞队列成功
13         if (isRunning(c) && workQueue.offer(command)) {
14             int recheck = ctl.get();
15             // 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,
16             // 并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
17             if (!isRunning(recheck) && remove(command)) 
18                 reject(command);
19             else if (workerCountOf(recheck) == 0) // 如果当前线程池为空就新创建一个线程并执行。
20                 addWorker(null, false);
21         } else if (!addWorker(command, false)) // 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
22             reject(command);
23     }

  线程池中所定义的状态。

1     private static final int RUNNING    = -1 << COUNT_BITS;
2     private static final int SHUTDOWN   =  0 << COUNT_BITS;
3     private static final int STOP       =  1 << COUNT_BITS;
4     private static final int TIDYING    =  2 << COUNT_BITS;
5     private static final int TERMINATED =  3 << COUNT_BITS;

   状态说明:

    RUNNING:

      (1):状态说明:运行状态,指可以接受任务执行队列里的任务,线程池的初始化状态就是RUNNING,也就是说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0;

      (2):状态切换:线程池的初始化状态

   SHUTDOWN:

      (1):状态说明:线程池处于SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

      (2):状态切换:调用线程池的shutdown()方法时,线程池状态由RUNNING-->SHUTDOWN。

   STOP:

      (1):状态说明:线程池处于STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

       (2):状态切换:调用线程池的shutdownNow()方法时,线程池状态由(RUNNING 或 SHUTDOWN)-->STOP。

    TIDYING:

      (1):状态说明:当所有的任务已经终止,任务数量为0时,线程池状态会变为TIDYING状态,当线程池状态变为TIDYING时,会执行钩子函数terminated()。terminated()函数在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING状态时进行相应的处理,可以通过重载terminated()方法来实现。

       (2):状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN-->TIDYING,当线程池在STOP状态下,线程池中的执行任务为空时,就会由STOP-->TIDYING。

  TERMINATED:

      终止状态,当执行terminated()方法时就会更新为这个状态。

      (1):状态说明:线程池彻底终止,就会变成TERMINATED状态。

      (2):状态切换:线程池处于TIDYING状态时,执行terminated()之后,状态由TIDYING-->TERMINATED。

如何配置线程?

  通常我们是需要根据这批任务执行的性质来决定:

    1.IO密集型任务:由于线程并不是一直都在运行,所以可以尽可能的多配置线程,比如CPU个数*2;

    2.CPU密集型任务:(大量复杂的运算)应当分配较少的线程,比如CPU个数相当的大小。

关闭线程池的方式?

  其实无非就是两个方法shutdown()和shutdownNow();

    shutdown()执行后,停止接收新任务,会把队列的任务执行完毕。

    shutdownNow()也是停止接收新任务,但会中断所有的任务,将线程状态变为STOP。

使用线程池的好处?

  池化技术是一种非常有用的技术,对于线程来说,创建一个线程的代价是很高的,如果我们在创建了一个线程,并且让这个线程做一个任务之后就回收的话,那么下次要使用线程来执行我们的任务的时候又需要创建一个新的线程,是否可以在创建完成一个线程之后一直缓冲,直到系统关闭的时候再进行回收呢?

  线程池就是这样的组件,使用线程池,就没必要频繁创建线程,线程池会为我们管理线程,当我们需要一个新的线程来执行我们的任务的时候,就向线程池申请,而线程池会从池子里面找到一个空闲的线程返回给请求者,如果池子里面没有可用的线程,那么线程池会根据一些参数指标来创建一个新的线程,或者将我们的任务提交到任务队列中去,等待一个空闲的线程来执行这个任务。

  线程池里面有很多线程,这些线程会一直到系统关闭才会被回收,否则一直会处于处理任务或者等待处理任务的状态。

  首先,如何使用线程池呢?下面的代码展示了如何使用java线程池的例子:

 1 public class ExecutorsDemo {
 2     public static void main(String[] args) {
 3         int cpuCoreCount = Runtime.getRuntime().availableProcessors();
 4         System.out.println("cpuCoreCount:"+cpuCoreCount);
 5         AThreadFactory aThreadFactory = new AThreadFactory();
 6         ARunnable1 aRunnable = new ARunnable1();
 7         //固定大小线程池
 8         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(cpuCoreCount,aThreadFactory);
 9         //无界线程池,可以进行自动线程回收
10         ExecutorService cacheThreadPool = Executors.newCachedThreadPool(aThreadFactory);
11         ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(cpuCoreCount,aThreadFactory);
12         //单个后台线程
13         ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor(aThreadFactory);
14         System.out.println("fixedThreadPool:"+fixedThreadPool.toString());
15         fixedThreadPool.submit(aRunnable);
16         System.out.println("fixedThreadPool:"+fixedThreadPool.toString());
17         cacheThreadPool.submit(aRunnable);
18         newScheduledThreadPool.scheduleAtFixedRate(aRunnable,0,1,TimeUnit.SECONDS);
19         singleThreadExecutor.scheduleWithFixedDelay(aRunnable,0,100,TimeUnit.MILLISECONDS);
20         System.out.println("开始等待...");
21         try {
22             TimeUnit.SECONDS.sleep(5);
23         } catch (InterruptedException e) {
24             e.printStackTrace();
25         }
26         System.out.println("等待结束...");
27         fixedThreadPool.shutdownNow();
28         cacheThreadPool.shutdownNow();
29         newScheduledThreadPool.shutdownNow();
30         singleThreadExecutor.shutdownNow();
31 
32     }
33 }
34 class ARunnable1 implements Runnable{
35 
36     @Override
37     public void run() {
38         System.out.println("进入run方法...");
39         try {
40             Thread.sleep(1000);
41         } catch (InterruptedException e) {
42             e.printStackTrace();
43         }
44         System.out.println("Current Thread Name:"+Thread.currentThread().getName());
45     }
46 }
47 //创建线程工厂
48 class AThreadFactory implements ThreadFactory{
49 
50     private final AtomicInteger threadNumber = new AtomicInteger(1);
51     @Override
52     public Thread newThread(Runnable r) {
53         return new Thread("aThread-"+threadNumber.incrementAndGet());
54     }
55 }

原文地址:https://www.cnblogs.com/wk-missQ1/p/12384443.html