java线程池框架源代码分析

相关类Executor,Executors。AbstractExecutorService。ExecutorService

Executor:整个线程池运行者框架的顶层接口。

定义了一个execute方法。整个线程运行者框架的核心方法。

public interface Executor {

    void execute(Runnable command);
}

ExecutorService:这是一个接口它继承自Executor,定义了shutdown。shutdownNow,awaitTermination,submit。invokeAll等方法。

AbstractExecutorService:实现了ExecutorService接口中的submit,invokeAll的方法。

  public Future<?

> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

      在这里。全部submit方法提交的任务终于还是调用了execute方法。execute是接口Executor中定义的方法,AbstractExecutorService没有实现它,

须要子类去实现这种方法。ThreadPoolExecutor继承了AbstractExecutorService,它实现了execute方法。ScheduledThreadPoolExecutor继承自

ThreadPoolExecutor,并覆盖了ThreadPoolExecutor的execute方法。这种方法是线程运行框者架的核心逻辑,不同的线程池运行者有不同的实现逻辑。


     AbstractExecutorService的功能较为简单。实现了不同參数的submit。invokeAll方法。


ThreadPoolExecutor线程池运行者:它有一个核心的成员变量:

        private final HashSet<Worker> workers = new HashSet<Worker>();

    workers能够看做是ThreadPoolExecutor中用于执行任务的线程池。


    worker是一个封装了一个Thread对象并实现了Runnable接口的类。

封装Thread非常easy理解,由于它要利用Thread去执行execute方法提交过来的runnable任务。

可是为什么会继承runnable接口呢?

以下是剔除了部分代码的Worker源代码:

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    	 final Thread thread;
        
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }


    }

Worker是ThreadPoolExecutor的一个内部类,Worker本身实现了Runnable接口,并封装了一个Thread对象,最后在构造方法中获取了一个Runnable对象,这个对象就是ThreadPoolExecutor通过execute提交过来的目标任务。

跟踪runWorker(this)方法:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//在这里直接调用了目标任务的run方法,并没有将它传给Thread对象。

} catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }


回过头来在看看Worker的构造方法:

Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

它将自己传给了自己的成员变量thread。

目标任务被运行的步骤可能就是:Worker的成员变量thread启动后调用worker的run方法。worker的run方法中将自己传给runWorker,runWorker在调用目标运行对象的run方法。


那么thread是何时被运行的呢?

以下看看ThreadPoolExecutor中的一个其它方法:

   private boolean addWorker(Runnable firstTask, boolean core) {
       ......
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;//这里初始化一个Worker对象w。在将w的成员变量thread付给t
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//在这里调用t的start方法。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

这里为什么会设计的这么绕,我想主要是Worker不仅封装了一个thread,并且对目标任务进行了封装。在执行封装过后的目标任务前,addWorker能够做一些相关操作。

这里只介绍了ThreadPoolExecutor的线程池。那么这个线程池是怎样被维护的。以下介绍几个关键的參数。

private volatile int corePoolSize;
  private volatile int maximumPoolSize;
  private final BlockingQueue<Runnable> workQueue;

    这三个是ThreadPoolExecutor的成员变量,当中workQueue跟县城池没有关系。

workQueue是一个线程安全的堵塞队列。

    corePoolSize是线程池的核心大小。maximumPoolSize是线程池的最大大小。

    当提交新任务时,假设ThreadPoolExecutor中有线程在执行。而且线程的数量小于corePoolSize,那么就会有新的线程被创建。

假设当前执行的线程数大于corePoolSize,就会放到缓存队列workQueue中。假设缓冲队列也满了。就继续创建线程,直到线程的数量达到maximumPoolSize

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        //推断假设当前执行的线程数小于 corePoolSize,加入新的线程(addWorker会加入一个新的线程。上面有介绍),方法直接返回。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }


        //假设当前的执行的线程数大于或等于corePoolSize则新的任务会放到缓存队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
        //最后缓冲队列加入失败,则会继续加入线程。

假设加入新的线程失败,则拒绝这个任务。 reject(command); }


还有些其它的參数:

private volatile ThreadFactory threadFactory //线程的工厂函数。

private volatile RejectedExecutionHandler handler;//任务拒绝的处理类。 private volatile long keepAliveTime;//任务等待的是将。

ThreadPoolExecutor有几个构造方法来初始化这些參数。Executors类将这些參数简化了来获得一个ExecutorService的引用。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    } 

     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

     这四个方法中前两个的核心线程数和最大线程数同样,全部可执行的线程数是固定的,<=nThreads。

当任务数大于nThreads时,就是放入缓冲队列中。  后两个方法中,线程数是无边界的,核心线程数是0,最大线程数是整型的最大值,然后假设有线程60秒内没有任务执行的话就销毁。每次有新的任务来,都会创建新的线程或使用曾经创建的线程(60秒内没有任务执行的线程)。

你可能有疑问。既然核心线程数是0,那么全部的任务不是都放到队里里了吗?那么如今就来看看SynchronousQueue这个队里,能够看看这里的介绍http://wsmajunfeng.iteye.com/blog/1629352/。

    回过头来看看任务提交方法的源代码:

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//这里是在往队列里方任务,假设不成功就会加入Worker(封装了线程对象)
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

      上面链接里的博客提到:offer()往queue里放一个element后马上返回,假设碰巧这个element被还有一个thread取走了,offer方法返回true。觉得offer成功;否则返回false。
试想一下,第一次提交任务的时候,核心线程数为0,此时没有线程所以没有线程从workQueue中取东西,所以这里的workQueue.offer(command)会返回false,那么就会通过addWorker(command, false)创建一个新的线程。



原文地址:https://www.cnblogs.com/gcczhongduan/p/5059023.html