线程池

为什么要使用线程池呢?因为创建线程和销毁线程的开销很大,如果来一个请求就要创建一个线程,处理完之后又要销毁,这样频繁的创建和销毁会消耗大量的系统资源。
java线程池的原理:创建一个ThreadPool,核心线程数为m,最大线程数为n,还有一个阻塞队列BlockingQueue。当来一个请求的时候,就创建一个线程,直到达到m。此时如果继续有请求到来,则不会创建线程了,而是把任务放到阻塞队列中。当请求继续到来,直到阻塞队列满了的时候,才会在m的基础上继续创建线程处理请求,直到达到n。如果此时还有请求到来,则采用拒绝策略。如果后面请求越来越少了,那么n-m的请求就会回收掉(在等待了keepAliveTime时间后),而核心线程数m会继续保留在pool中。
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 阻塞队列
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于                                        ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
  • 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。。
     
 
那么要如何向线程池中提交任务呢?
可以使用execute提交任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。
01 threadsPool.execute(new Runnable() {
02 @Override
03  
04 public void run() {
05  
06 // TODO Auto-generated method stub
07  
08 }
09  
10 });
 
我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
try {
Object s = future.get();
catch (InterruptedException e) {
catch (ExecutionException e) {
finally {
executor.shutdown();
}
 
 
类的层次结构:
public interface Executor {
 
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     */
    void execute(Runnable command);
}
 
public interface ExecutorService extends Executor {
 
   
    void shutdown();
 
    
    boolean isShutdown();
 
    
    boolean isTerminated();
 
   
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
 
   
    <T> Future<T> submit(Callable<T> task);
 
    
    <T> Future<T> submit(Runnable task, T result);
 
    
    Future<?> submit(Runnable task);
 
   
}
 
public class ThreadPoolExecutor implements ExecutorService{
 

Executors工具类:

创建固定大小的线程池,核心线程数和最大线程数一样大。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
 
 
 
如何关闭线程池呢?

public void shutdown()

(1)线程池的状态变成SHUTDOWN状态,此时不能再往线程池中添加新的任务,否则会抛出RejectedExecutionException异常。

(2)线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 

注意这个函数不会等待提交的任务执行完成

public List<Runnable> shutdownNow()

(1)线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。

(2)终止等待执行的线程,并返回它们的列表;

(3)试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是大家知道,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

要如何合理的配置线程池呢?

需要考虑的因素有:

    1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
    2. 任务的优先级:高,中和低。
    3. 任务的执行时间:长,中和短。
    4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行。

 
原文地址:https://www.cnblogs.com/james111/p/6995801.html