线程池

通过Executor创建线程池

Executors.newFixedTreadPool 

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

内部通过new ThreadPoolExecutor创建线程池

返回一个固定数量的线程池。如果线程池中有空闲线程则直接交给空闲线程执行。如果没有将任务放到队列 默认使用的是LinkedBlockingQueue 无界队列,如果大量任务线程池线程来不及处理,产生无限堆积可能会有OOM风险

Executors.newSingleThreadExecutor 

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

返回一个线程的线程池,如有空闲则执行,没有则将任务放到队列中等待 LinkedBlockingQueue 无界队列,如果大量任务线程池线程来不及处理,产生无限堆积可能会有OOM风险,

通过DelegatedExecutorService包装返回 重写了finalize 如果我们没有手动shtdown在GC回收的时候会调用此方法完成回收 

同时通过FinalizableDelegatedExecutorService包装 只能暴露ExecutorService相关方法

    static class FinalizableDelegatedExecutorService
            extends Executors.DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        //重写了finalize 如果我们没有手动shtdown在GC回收的时候会调用此方法完成回收 
        protected void finalize() {
            super.shutdown();
        }
    }
static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }

Executor.newCachedTreadPool 

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

返回一个根据实际情况调整线程个数的线程池塘 不限制最大线程数,如果有空闲线程则直接交给空线程执行 没有则创建,线程空闲超过60秒则指定回收

Exucutors.newScheduledThreadPool 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }


  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

可以发现还是通过ThreaPoolExecutor实现 队列使用DeayedWorkQueue

返回SchededExecutoryService对象

可以实现定时任务

 public static void main(String[] args) throws InterruptedException {
       ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
       scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
               System.out.println("11");
           }
       },1,3,TimeUnit.SECONDS);
      // 1为延迟多久执行  3为轮训时间  TimeUnit.seconds为 时间单位
    }

自定义线程池

ThreadPoolExecutor的构造函数
 public ThreadPoolExecutor(int corePoolSize,//核心线程数量
                              int maximumPoolSize,//最大线程数量(如果没有超过最大线程数量 没有空闲线程则创建)
                              long keepAliveTime,//线程的生命周期 超过了corePoolSize的空闲线程回收时间
                              TimeUnit unit,//keepAliveTime时间单位
                              BlockingQueue<Runnable> workQueue,//当
                              ThreadFactory threadFactory,//线程工厂一般默认即可
                              RejectedExecutionHandler handler)//队列有有界队列。如果任务队列满了以后。拒绝的任务的自定义操作

corePoolSize

      核心线程数,在创建线程池后,默认情况下线程池中并没有任何线程,而是等待任务到来才去创建线程。当线程池中的线程数目达到corePoolSize后,新来的任务将会被添加到队列汇总,也就是workQueue

ThreadPoolExecutor#prestartAllCoreThreads() 方法或者是通过ThreadPoolExecutor # prestartCoreThread()方法预创建线程,不用等到任务来了之后才创建,大小一般设置机器核数

int threadCount = Runtime.getRuntime().availableProcessors();

maximumPoolSize

                  最大线程数量,前面说了线程池的数量大于等于corePoolSize同时没有空闲线程,还有任务进来。会放到workQueue,当workQueue满了之后会看线池线程数量是否大于maximumPoolSize,如果不大于则创建线程执行任务,如果队列满了线程数量又等于maximumPoolSize则触发拒绝策略配置的RejectedExecutionHandler

keepAliveTime

                  corePoolSize之后创建线程的线程存活时间指的是非corePoolSize这部分线程即corePoolSize之后创建的线程,如果超过指定时间还有没执行过任务

workQueue

                 阻塞队列,如果线程数量大于corePoolSize同时没有空闲线程,还有任务到来既会尝试加入此对了,有可能成功有可能失败,失败一般指的是有界队列 队列满了。


threadFactory

                线程工厂。用来为线程池创建线程,当我们不指定线程工厂时,线程池内部会调用Executors.defaultThreadFactory()创建默认的线程工厂,其后续创建的线程优先级都是Thread.NORM_PRIORITY。如果我们指定线程工厂,我们可以对产生的线程进行一定的操作。

handler

          拒绝执行策略。当线程池的缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,

线程池核心调度源码

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //workerCountOf 获取线程数量如果数量小于corePoolSize 则直addWorker调度执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /**
         *   如果线程数量超过corePoolSize 则调用如果进入等待队列失败 则执行拒绝策略
         *   进入等待队列失败比如队列满了ArrayBlockingQueue 或者SynchronousQueue
         *    则直接提交调度执行
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //入队列失败 直接调度执行 如果线程数量超过max 则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

workQueue的几种队列

https://www.cnblogs.com/LQBlog/p/8733764.html

线程池拒绝策略

jdk默认拒绝策略

AbortPolicy 该策略直接抛出异常 影响系统正常运行

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }

CallerRunsPolicy 当前线程执行

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
       
        public CallerRunsPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //线程池不是shutDown则直接调用run方法 当前线程执行
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

DiscardOldestPolicy 个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。 显然这样会影响线程提交性能

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                //从队列获取并删除元素
                e.getQueue().poll();
                //再次调用execute 方回
                e.execute(r);
            }
        }
    }

DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

 public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

如果默认的拒绝策略无法满足 则可以自己通过实现RejectedExecutionHandler接口

比如一些耗时操作。Runable对象封装业务单号。拒绝策略执行落库 后期重试如

     public MyRejectedExecutionHandler() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(r instanceof  OrderTask){
                OrderTask orderTask=(OrderTask)r;
                String orderTaskName=orderTask.getOrderTask();
                String parameter=orderTask.getParameter();
                //执行insert 落库或者发mq后期重试
            }
        }
    }

线程工厂ThradFactory

jdk提供的2种默认工厂

Executors.defaultThreadFactory() 返回用于创建新线程的默认线程工厂。

Executors.privilegedThreadFactory()    返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。

自定义线程名字的Factory

/**
 * @Project micro-service
 * @PackageName cn.wine.ms.promotion.utils.thread.factory
 * @ClassName NameThreadFacotry
 * @Author qiang.li
 * @Date 2021/8/10 9:57 上午
 * @Description 指定工作线程名字 方便排查问题,不用随机的名字
 * 参考Executors.defaultThreadFactory() 实现 只是自定义namePrefix
 */
public class NameThreadFactory implements ThreadFactory {
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public NameThreadFactory(String threadNamePrefix) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                threadNamePrefix+
                "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 扩展线程池

可以通过这几个方法扩展 监听线程的执行时间

 public static  class SimpleThreadPoolExecutor extends  ThreadPoolExecutor{


        public SimpleThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        /**
         * 线程执行之前调用
         * @param t
         * @param r
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t,r);
        }

        /**
         *  线程执行之后调用
         * @param r
         * @param t
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
        }

        /**
         * terminaerd 线程退出时调用
         */
        @Override
        protected void terminated() {
            super.terminated();
        }
    }

线程池大小计算公式

 感觉不大适用  现在是集群 同时一个应用不止一个线程池

可以设置为cpu核数

   int threadCount = Runtime.getRuntime().availableProcessors();

线程池的堆栈异常信息

无法打印异常信息的写法

  ExecutorService executorService=Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    int j=1/0;
                    System.out.println("ddd");
                }
            });
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

可以打印异常信息的写法

方法一 改为execute

ExecutorService executorService=Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    int j=1/0;
                    System.out.println("ddd");
                }
            });
        }
      
    }

方法二 使用future.get

   ExecutorService executorService=Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            Future future= executorService.submit(new Runnable() {
                @Override
                public void run() {
                    int j=1/0;
                    System.out.println("ddd");
                }
            });
            //使用get
            future.get();
        }

 缺点就是能能定位到哪里抛出的异常 并不能定位到哪里提交的task

重写线程池打印异常

/**
 * @author liqiang
 * @date 2019/9/19 18:43
 * @Description:
 */
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(warp(command,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(warp(task,clientTrace(),Thread.currentThread().getName()));
    }

    private Exception clientTrace(){
        //保存着提交线程的堆栈信息
       return  new Exception("client stack trace");
    }

    public  Runnable warp(final Runnable task,final Exception clientException,String clientThreadName){
        return new Runnable() {
            @Override
            public void run() {
                try{
                    task.run();
                }catch (Exception e){
                    //发生异常后打印提交线程的堆栈信息
                    clientException.printStackTrace();
                    throw e;
                }
            }
        };
    }
}
  TraceThreadPoolExecutor executorService=new TraceThreadPoolExecutor(0,1,50,TimeUnit.MINUTES,new SynchronousQueue());
        for(int i=0;i<10;i++){
            Future future= executorService.submit(new Runnable() {
                @Override
                public void run() {
                    int j=1/0;
                    System.out.println("ddd");
                }
            });
        }

重写后就能正常打印到提交地点的堆栈信息

原文地址:https://www.cnblogs.com/LQBlog/p/8735356.html