线程池

  关于线程池的源码分析,在这里也没认真说明,后面单独起一片文章进行研究。

1.大纲

  线程池介绍

  创建与停止线程池

  常见的线程池特点与用法

  任务太多,怎么拒绝

  钩子方法

  实现原理,源码分析

  使用线程池的主要点

一:介绍

1.重要性

  使用中重要

  面试中重要

2.池

  线程可以复用

  可以控制资源的总量

3.不使用线程池些的程序

  这里有两个程序,只粘贴进行循环对每个任务进行创建线程,并执行

package com.jun.juc.threadpool;

/**
 * for循环执行每一个任务的线程
 * 可以正常的执行,但是有些问题
 * 开销大,反复的操作系统进行创建与销毁
 */
public class ForLoop {
    public static void main(String[] args) {
        for (int i = 0; i< 10000; i++){
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
}

  

4.为什么使用线程池

  反复的创建,开销大

    让一部分的线程保持工作,反复的执行任务

  过多的线程会占用太多的内存

    使用少量的线程

  

5.线程池的好处

  加快响应速度

  更好的利用CPU,与内存。选择合适的线程数

  统一管理

6.使用场景

  服务器接收大量的请求

  多个线程的创建

二:创建与停止线程池

1.线程池的构造函数的参数

  corePoolSize:核心线程数,int

  maxPoolSize:最大的线程数,int

  keepAliveTime:存活时间,long

  workQueue:任务存储队列,BlockingQueue

  threadFactory:工厂类,ThreadFactory

  Handler:拒绝策略,RejectedExecutionHandler

2.corePoolSize

  线程池进行初始化的时候,线程池里没有任何的线程,线程池会等待有任务到来的时候,再进行创建新线程执行任务

  

3.macPoolSize

  线程池有可能子啊核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一定的上限,这个就是最大量

  如果超过了corePoolSize的时候,先将任务放到队列中。

  队列中满了,才会去看

4.添加线程的规则

  如果线程小于corePoolSize的时候,即使线程有处于空闲状态,也会继续创建新的线程运行新的任务

  如果等于大于corePoolSize,但是小于maxPoolSize,放入队列

  如果队列已满,并且线程小于maxPoolSize,创建新的线程

  

5.keepAliveTime

  主要看是控制的是谁。

  如果线程池的当前的线程数多于了corePoolSize,那么多于的线程空闲时间超过keepAliveTime,将会被终止

  减少资源消耗

6.ThreadFactory

  新的线程默认使用Exectors.defaultThreadFactory(),创建的线程都在一个线程组,拥有相同的NORM_PRIORITY优先级,并且都不是守护线程

7.workQueue

  工作队列

  最常见的队列类型:

    直接交换:SynchronousQueue,内部没有容量

    无界队列:LinkedBlockingQueue

    有界队列:ArrayBlockingQueue

三:常见的线程池

1.FixedThreadPool创建

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  可以看见,corePoolSize与maxPoolSize是相等的

  然后使用的是无界队列。

  由于传递进去的任务是没有容量上限的,可能占用大量的内存,出现OOM

2.演示溢出

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * FixedThreadPool的使用场景
 */
public class FixedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i=0; i<Integer.MAX_VALUE;i++){
            executorService.execute(new Task());
        }
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                // 让这个任务执行的很慢,表示队列中会一直增加
                Thread.sleep(500000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行了任务");
        }
    }
}

  效果:

D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:63206,suspend=y,server=n -Xmx8m -Xms8m -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144j
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at com.jun.juc.threadpool.FixedThreadPoolTest.main(FixedThreadPoolTest.java:13)

  

3.SingleThreadExector的使用

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  说明:

  相比于FixedThreadPool,只是corePoolSize与macPoolSize都是1,其他不变

4.也会出现OOM

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExectos {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i=0; i<Integer.MAX_VALUE;i++){
            executorService.execute(new FixedThreadPoolTest.Task());
        }
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                // 让这个任务执行的很慢,表示队列中会一直增加
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行了任务");
        }
    }
}

  

5.CachedThreadPool

  可缓存线程池,无界的线程池,可以自动回收多于线程的功能

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  其中,最大的线程数没有限制,也是一个大的弊端。如果任务量过大,一样会出现的是OOM

6.测试

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i=0; i< 1000; i++){
            executorService.execute(new Task());
        }
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                // 让这个任务执行的很慢,表示队列中会一直增加
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"执行了任务");
        }
    }


}

  效果:

Thread[pool-1-thread-495,5,main]执行了任务
Thread[pool-1-thread-491,5,main]执行了任务
Thread[pool-1-thread-511,5,main]执行了任务
Thread[pool-1-thread-508,5,main]执行了任务
Thread[pool-1-thread-507,5,main]执行了任务
Thread[pool-1-thread-499,5,main]执行了任务
Thread[pool-1-thread-519,5,main]执行了任务
Thread[pool-1-thread-522,5,main]执行了任务
Thread[pool-1-thread-518,5,main]执行了任务
Thread[pool-1-thread-510,5,main]执行了任务

  创建了很多的线程

7.ScheduledThreadPool

  支持定时与周期性的执行的线程池

  核心线程是传递过去的,但是最大的核心线程数是INTEGER.MAX_VALUE

8.延迟一定时间之后运行

  定时

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS);
        
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                // 让这个任务执行的很慢,表示队列中会一直增加
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"执行了任务");
        }
    }


}

  

9.周期性的运行

  周期

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS);
        // 周期的运行
        scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                // 让这个任务执行的很慢,表示队列中会一直增加
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"执行了任务");
        }
    }


}

  

10.总结

  

   主要注意的是CacheDThreadPool与ScheduleThreadPool的对比。

  CachedThreadPool为啥使用SynchronousQueue,因为有任务不需要进行存储,直接交给线程执行就行了。

  ScheduledThreadPool使用的是延迟队列DelayedWorkQueue

 

四:线程池中的线程数量设定

1.计算密集型的

  为cpu核心数的1~2倍

2.耗时IO型的

  最佳线程数一般大于cpu很多倍。

  以jvm线程监控显示繁忙情况为依据,参考brain goetz推荐的计算方法

3.计算方法

  cpu核心数 * (1+平均等待时间/平均工作时间)

五:停止线程池

1.shutdown

  要线程中,会队列中的线程任务都执行完成后,再进行停止

  对拒绝新的任务

2.测试

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * shutdown关闭
 */
public class ShutDown {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i=0; i< 1000; i++){
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(2000);
        executorService.shutdown();
        executorService.execute(new ShutDownTask());
    }

    static class ShutDownTask implements Runnable {
        @Override
        public void run(){
            try {
                Thread.sleep(50);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  效果:

  可以发现,在执行一段时间后,就可以发现,真的不再进行接收任务了

pool-1-thread-1
pool-1-thread-1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jun.juc.threadpool.ShutDown$ShutDownTask@ed17bee rejected from java.util.concurrent.ThreadPoolExecutor@2a33fae0[Shutting down, pool size = 1, active threads = 1, queued tasks = 960, completed tasks = 39]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.jun.juc.threadpool.ShutDown.main(ShutDown.java:17)
pool-1-thread-1
pool-1-thread-1

  

3.isShutdown

  可以知道线程被停止过了

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * shutdown关闭
 */
public class ShutDown {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i=0; i< 100; i++){
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(2000);
        System.out.println(executorService.isShutdown());
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        executorService.execute(new ShutDownTask());
    }

    static class ShutDownTask implements Runnable {
        @Override
        public void run(){
            try {
                Thread.sleep(50);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  效果:

   先false,然后true

4.isTerminated

  返回是否真正的结束

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * shutdown关闭
 */
public class ShutDown {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i=0; i< 10; i++){
            executorService.execute(new ShutDownTask());
        }
        System.out.println(executorService.isShutdown());
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        Thread.sleep(2000);
        System.out.println(executorService.isTerminated());
    }

    static class ShutDownTask implements Runnable {
        @Override
        public void run(){
            try {
                Thread.sleep(50);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

    效果:

D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:62572,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144jrelibdeploy.jarConnected to the target VM, addre
true
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
Disconnected from the target VM, address: '127.0.0.1:62572', transport: 'socket'
true

Process finished with exit code 0

  

 5.awaitTermination

  所有的任务都执行完毕,等待的时间到了,等待过程中被打断都会返回,否则阻塞。

  说明:8秒内,如果关闭了线程,并且都执行完成返回true,否则是false

package com.jun.juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * shutdown关闭
 */
public class ShutDown {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i=0; i< 10; i++){
            executorService.execute(new ShutDownTask());
        }
        executorService.shutdown();
        boolean b = executorService.awaitTermination(8L, TimeUnit.SECONDS);
        System.out.println("b="+b);
    }

    static class ShutDownTask implements Runnable {
        @Override
        public void run(){
            try {
                Thread.sleep(50);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  

 6.shutdownNow

  立刻关闭线程

  存在返回未执行的任务。

package com.jun.juc.threadpool;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* shutdown关闭
*/
public class ShutDown {
public static void main(String[] args) throws Exception{
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i=0; i< 10; i++){
executorService.execute(new ShutDownTask());
}
// shutdownNow
List<Runnable> runnables = executorService.shutdownNow();
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
runnables.forEach(item -> executorService2.execute(item));
}

static class ShutDownTask implements Runnable {
@Override
public void run(){
try {
Thread.sleep(50);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("线程终端了");
}
}
}
}

  

六:拒绝策略

 1.拒绝时机

  Executor关闭时

  最大线程和队列已满

2.拒绝策略

  AbsortPolicy:直接抛出异常

  DiscradPolicy:默默丢弃

  DiscardOldestPolicy:丢弃最老的任务

  CallerRunsPolicy:谁提交的任务,则有谁进行运行,这样可以降低提交速度

七:钩子方法

1.说明

  在任务的前后

  日志,统计

2.暂停线程池

package com.jun.juc.threadpool;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class PauseableThreadPool extends ThreadPoolExecutor {
    /**
     * 并发加锁
     */
    private final ReentrantLock lock = new ReentrantLock();

    private Condition unpaused = lock.newCondition();

    /**
     * 是否暂停
     */
    private boolean isPaused;

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * 执行之前,暂停
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 暂停
     */
    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 恢复
     */
    public void resume(){
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        Runnable task = new Runnable(){
            @Override
            public void run() {
                System.out.println("开始执行了");
                try {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        for(int i=0; i<1000; i++){
            pauseableThreadPool.execute(task);
        }
        Thread.sleep(1000);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1000);
        pauseableThreadPool.resume();
        System.out.println("线程池被再次执行了");
    }

}

  效果:

开始执行了
开始执行了
线程池被暂停了
线程池被再次执行了
开始执行了
开始执行了

八:源码

1.组成部分

  线程池管理器

  工作线程

  任务队列

  任务接口

2.Exector家族

  

3.Exector

  顶层接口,只有一个方法

 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

  

4.ExecuorService

  继承了Excetor,然后增加了几个新的方法

  初步的有了管理线程池的方法

  

5.Excetors  

  这是一个工具类

  进入可以发现是使用ThreadPoolExector进行创建的线程

6.线程池实现任务复用的原理

  execute:

  

  添加到worker

  

  

  進行运行:

  

九:线程池状态

1.线程池状态

  Running:接收新任务并处理排队任务

  SHUTDOWN:不接受新任务,但是处理排队任务

  stop:不接受新任务,也不处理排队任务,并中断正在进行的任务

  tidying:所有的任务都已经终结,workerCount为零时,线程就会转为这个状态,并且运行terminate()方法

  TERMIMATED:运行完成

2.状态值

   

  

  

  

    

原文地址:https://www.cnblogs.com/juncaoit/p/12866361.html