关于java线程池 Ⅱ

上一篇翻译了线程池主要部分的api,经过一段时间的学习,这里记录一下这段时间对jdk自带线程池的学习成果。

为了方便说明,先放一张类图,包括了jdk线程池主要涉及到的类,为了条理清晰去掉了部分依赖和关联关系

说明

概述

把握住主要的几个类可以比较快的了解整个实现的思想,下面罗列一下,细节的部分会在之后详细描述:

主要的接口定义:Executor,Future,ExecutorService,Runnable

主要的抽象类:AbstractExecutorService

主要的实现类:ThreadPoolExecutor

模型骨架

按顺序讲讲我理解的这些代码的coder也就是Doug Lea在编写这部分代码的时候的思路:

1.首先想到的是最抽象的定义,应该有个执行者,这个执行者的行为只有一个,就是执行可以运行的(Runnable)的任务,出于此目的,他设计了Executor接口,定义了一个行为execute(Runnable command).

2.因为Runnable接口只能保证一个任务(task)是可执行的,并不能记录这个任务执行期间实时的状态及执行结果等特性,并且也不能够提供类似取消执行任务这样的操作,所以需要另设一个接口,通过这个接口来保证前面所描述的这些特性能够实现,在这样的前提下,诞生了Future接口,具体接口行为定义如下:

1.cancel(boolean mayInterruptIfRunning):取消执行操作,这个取消操作对队列中尚在等待的任务有效,已经执行的任务在调用此方法的时候将会返回false。但是,若将mayInterruptIfRunning方法的值设置为true时,将会中断正在执行的任务,正在执行的任务应当抛出InterruptException.

2.isCancelled():判断当前任务是否已经取消

3.isDone():判断当前任务是否完成

4.get():调用此方法的线程,在必要的情况下,等待执行任务的线程执行完毕,并最终获取返回结果。

5.get(long timeout,TimeUnit unit):调用此方法的线程,在一定的时间内等待执行任务的线程执行完毕,指定时间内未能获取返回结果将最终导致抛出TimeOutException异常。

3.定义了Executor,Future这两类元素,再加上Runnable元素,模型的草图就差不多设计完成了,但是实际上Executor并不能与Future协同工作,记得吗?Executor只有一个行为,execute(Runnable command).Executor实际上只能与Runnable协同工作,那么为了使Executor同时还能与Future协同工作,ExecutorService接口诞生了,具体接口行为定义如下:

1.shutdown():关闭当前的ExecutorService,这个方法允许等待执行的任务执行完毕。

2.shutdownNow():关闭当前的ExecutorService,这个方法会阻止等待执行的任务启动,并且会尝试中断正在执行的任务。

3.isShutdown():判断当前ExecutorService是否已经关闭

4.isTerminated():判断在执行了shutdown操作之后是否所有的任务都已经完成了。(不管是普通的执行完毕还是抛出异常都算执行终止)

5.awaitTermination(long timeout,TimeUnit unit):等待在执行了shutdown操作之后剩余的等待执行的任务执行完毕,这个操作将会在指定时间内阻塞线程,直到返回结果或者超时。

6.submit(Callable<T> task):Future<T>  :提交一个Callable任务,并返回一个Future对象,如果你想在这个方法上阻塞只到这个任务执行完毕,你可以这么写:exec.submit(task).get();

7.submit(Runnable task,T result):Future<T>   :这个方法与6类似,只是因为提交的是Runnable任务,这个任务自身执行完毕是没有返回值的,所以需要自己设置返回结果,即result;

8.submit(Runnable taks):Future<T>   :还是同上,只是放回的Future<T>执行get()方法时将会返回null.

9.invokeAll(Collection<? extends Callable<T> tasks):List<Future<T>>:批量的执行Callable任务,并返回一个List<Future<T>>

10.invokeAll(Collection<? extends Callable<T> tasks,long  timeout, TimeUnit unit):List<Future<T>>  :猜也能猜个八九不离十了,在指定时间内批量的执行Callable任务,并返回一个List<Future<T>>,接口定义中这样描述,不管是所有任务完成或者是超时,都将返回List<Future<T>>。这意味着,这个Future的list中会有一部分Future指示的任务可能是未完成的。

11.invokeAny(Collection<? extends Callable<T> tasks>):T  :这个行为比较有意思,批量执行Callable任务(不保证全部执行),返回其中执行成功的一个任务的执行结果。一旦有任意一个任务成功返回,或者抛出异常,则其他还未完成的任务将被取消。

12.invokeAny(Collection<? extends Callable<T> tasks>, long timeout, TimeUnit unit):和上一个行为是一样的,只是限制了再一定时间内完成。

 

实际上到了这一步,整个模型的骨架已经搭建起来了。但是,到目前为止可以看出的是,实际上这个模型搭建的是单个执行者(Executor),多个任务(Runnable或者Callable)的工作场景。多个执行者,多个任务的工作场景,实际上是由ThreadPoolExecutor继承了AbstractExecutorService而实现的。下面我们主要看看这两个类。

详细设计

AbstractExecutorService

先着重介绍AbstractExecutorService,这个抽象类实现了ExecutorService的submit,invokeAll,invokeAny方法,即所有处理任务的方法(但是要注意,它并没有实现Executor的execute方法,这个方法将在ThreadPoolExecutor中实现,这里的所有处理任务的方法都只是对execute方法的包装)。

这个类中有两个有意思的设计,我们主要看看这两个设计:

1.这个类上有一个有意思的变化,还记得之前说的ExecutorService是可以同时Runnable与Future协同工作的吗?从类图上可以看出来,实际上AbstractExecutorService依赖于FutureTask这个类,而这个类又实现了RunnableFuture接口,而RunnableFuture接口继承了Future接口和Runnable接口!看出来了没?FutureTask实际上是一个Future与Runnable的混合产物!FutureTask即具有Future的行为,也具有Runnable的行为。这样就方便了,不管是想要Future作为返回值,还是想要Runnable作为参数,都只需要提供一个FutureTask就可以满足需要了,同时,统一使用FutureTask最主要的目的,按照我的思路,应该是屏蔽Future对象与Runnable对象之间的信息交流,减少实际编写时的代码开销(即把Future与Runnable对象的信息交互统一到了FutureTask中)

AbstractExecutorService提供了两个方法来将传入的Runnable对象和Callable对象转换成FutureTask对象,如下所示:

/**
     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @return a <tt>RunnableFuture</tt> which when run will run the
     * underlying runnable and which, as a <tt>Future</tt>, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * Returns a <tt>RunnableFuture</tt> for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @return a <tt>RunnableFuture</tt> which when run will call the
     * underlying callable and which, as a <tt>Future</tt>, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> 
newTaskFor(Callable<T>
 callable) {
        return new FutureTask<T>(callable);
    }

submit方法通过调用newTaskFor方法获取FutureTask对象,在调用execute(Runnable task)时,使用了FutureTask对象作为传入参数,返回时也使用了同一个FutureTask对象作为返回值,这样就对外屏蔽了Future与Runnable之间的信息交流,不必再在每个submit方法中都将返回的结果拼装成Future对象,然后返回。如下所示:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Object> ftask = 
newTaskFor(task,
 null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> 
submit(Callable<T>
 task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

这里顺带一提FutureTask的实现:应该不难发现,我们讲到这里为止,都没有说明,如果出现共享资源竞争,即多个线程同时执行时,线程安全问题怎么解决?这个问题实际上就是在FutureTask中解决的。FutureTask有个私有属性 sync,FutureTask的所有行为实际上都是由sync实现的。而这个sync所属的类Sync实际上是FutureTask的内部类.这个类继承了AbstractQueuedSynchronizer 。Sync可以保证在共享资源竞争时,有且仅有一个线程获得资源(这一部分我打算下一次仔细研究过以后再放上博客)。

2.AbstractExecutorService在实现invokeAny方法时,引入了ExecutorCompletionService。

先从为何这么设计说起。回头看看invokeAny的接口行为定义:批量执行Callable任务(不保证全部执行),返回其中执行成功的一个任务的执行结果。一旦有任意一个任务成功返回,或者抛出异常,则其他还未完成的任务将被取消。也就是说,要想实现这个行为,必须得知道当前是否有任意一个任务完成,那么怎么来判断是否有任意一个任务是否完成呢?(注意:是判断有没有任意一个任务完成,而不是判断当前任务是否完成,否则可以直接调用FutureTask的isDone()方法即可判断当前任务是否完成)可以看看FutureTask是否实现了这么一个方法。进入FutureTask的类,可以看到并没有这么一个方法,但是,其中有一个方法值得注意,如下所示:

/**
     * Protected method invoked when this task transitions to state
     * <tt>isDone</tt> (whether normally or via cancellation). The
     * default implementation does nothing.  Subclasses may override
     * this method to invoke completion callbacks or perform
     * bookkeeping. Note that you can query status inside the
     * implementation of this method to determine whether this task
     * has been cancelled.
     */
    protected void done() { }

这里实现了一个空的done()方法,根据注释,这个方法将会在任务状态变为完成的时候执行。那么就好办了,只需要实现一个类继承FutureTask然后实现done()方法,将已经完成的任务放入一个队列中,然后监控这个队列,一旦这个队列有值,就说明有一个任务完成了,invokeAny方法即可以返回这个完成的任务的结果了。

这个就是ExecutorCompletionService诞生的原因。ExecutorCompletionService类有一个内部类 QueueingFuture 继承自FutureTask,它实现了done()方法,将完成的任务加入了一个LinkedBlockingQueue<Future<V>>中。如下所示:

/**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

ExecutorCompletionService的submit方法为真正任务的执行者提供了一个QueueingFuture,并提供了一系列监控已完成任务队列:completionQueue的方法。这样在实际处理当中,就可以通过监控ExecutorCompletionService的completionQueue来判断是否有任务完成,这部分的具体代码在AbstractEexcutorService的

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)中,如下所示:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this); //用ExecutorCompletionService对自己做了一层包装

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            long lastTime = (timed)? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll(); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空则不进行等待,直接返回null
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空则等待指定的时间,如果超时时队列仍然为空,则返回null 
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    else
                        f = ecs.take(); //ExecutorCompletionService提供的监控completionQueue的方法,如果队列为空会一直等待直到队列中有值,然后取到值返回
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (InterruptedException ie) {
                        throw ie;
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

以上就是AbstractExecutorServcie中两个值得注意的地方,剩下的invokeAll方法由于比较简单就不介绍了。下面介绍一下真正实现线程池的关键类,ThreadPoolExecutor。

ThreadPoolExecutor

感觉这章要写不完了。。另起一章讲这个ThreadPoolExecutor吧。。

 

原文地址:https://www.cnblogs.com/crazybit/p/3150806.html