Java并发包9--Future接口简介

前言

在单线程模型下,想要获取方法执行的结果比较简单,直接获取方法返回的结果即可。但是在多线程情况下,如何获取其他线程执行的结果,此时就涉及到多线程之间数据传输的问题。比如A线程内部开启B线程和C线程,A线程如何获取B、C线程的执行结果呢?另外在B、C线程执行过程中,A线程是一直处于阻塞状态等待还是非阻塞继续执行呢?如果能够获取到B、C线程的结果那么何时去获取呢?这都是获取多线程执行结果需要解决的问题,而在JUC中就提供了Future就可以实现。

1、Callable接口

想要通过异步获取结果,就离不开Callable接口,众所周知创建线程可以通过继承Thread、实现Runnable的方式,另外还有一种可以返回操作结果的方式就是实现Callable接口

Callable接口只定义了一个方法,就是返回操作结果,定义如下:

@FunctionalInterface
public interface Callable<V> {
    /**
     * 返回操作结果或者直接抛异常
     */
    V call() throws Exception;
}

Callable接口可以返回操作的结果,那么还需要有一个对象来接收操作的结果,该对象就是Future对象

2、Future接口

Future 用于获取异步操作的结果,异步操作之后,结果会存在Future中,可以通过get()方法获取,Future接口定义如下:

public interface Future<V> {
        /**
         * 取消异步操作
         */
        boolean cancel(boolean mayInterruptIfRunning);

        /**
         * 判断异步操作是否取消
         */
        boolean isCancelled();

        /**
         * 判断异步操作是否完成
         */
        boolean isDone();

        /**
         * 获取异步结果
         */
        V get() throws InterruptedException, ExecutionException;

        /**
         * 获取异步结果,并且设置了超时时间
         */
        V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException;
    }

3、FutureTask类

FutureTask是Future的实现类,并且实现了Runnable接口,构造函数是Callable类型,所以FutureTask的作用就是创建线程执行FutureTask的内容并获取结果

3.1、使用案例

使用案例如下:

 1 public static void main(String[] args) throws ExecutionException, InterruptedException {
 2         /** 创建FutureTask,构造函数传入Callable*/
 3         FutureTask futureTask = new FutureTask(new Callable() {
 4             @Override
 5             public Object call() throws Exception {
 6                 Thread.sleep(3000L);
 7                 System.out.println("返回执行结果");
 8                 return "test";
 9             }
10         });
11         System.out.println("等待执行结果");
12         /** 创建线程运行Callable内容*/
13         futureTask.run();
14         /** 阻塞等待FutureTask的执行结果*/
15         String result = (String) futureTask.get();
16         System.out.println("结果为:"+result);
17     }

用法比较简单,首先通过构造函数传入Callable来构建FutureTask对象,然后调用FutureTask的run方法创建线程执行Callable的call方法,最后主线程调用FutureTask的get方法阻塞等待获取call方法执行的结果

3.2、实现原理

FutureTask实现了Runnable接口和Future接口,所以需要实现Runnable的run方法以及Future的5个方法,这里重点分析下run方法和get方法即可。

首先从FutureTask的构造方法分析,构造方法如下:

    /** 任务状态*/
    private volatile int state;
    /** 任务*/
    private Callable<V> callable;
    /** 执行任务线程 */
    private volatile Thread runner;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        /** 状态为新建状态*/
        this.state = NEW;       
    }

FutureTask内部有三个核心属性,分别表示任务的状态、任务的具体内容和执行任务的线程,构造方法创建FutureTask时会初始化callable和state两个属性,而执行的线程runner会留在任务执行时再创建.

FutureTask的run方法逻辑如下:

 1 public void run() {
 2         /** 1.通过Unsafe的CAS将当前线程Thread对象赋值给当前的FutureTask对象的runner属性 */
 3         if (state != NEW ||
 4                 !UNSAFE.compareAndSwapObject(this, runnerOffset,
 5                         null, Thread.currentThread()))
 6             return;
 7         try {
 8             Callable<V> c = callable;
 9             if (c != null && state == NEW) {
10                 V result;
11                 /** 完成标志 */
12                 boolean ran;
13                 try {
14                     /** 2.执行Callable的call方法并获取结果 */
15                     result = c.call();
16                     ran = true;
17                 } catch (Throwable ex) {
18                     result = null;
19                     ran = false;
20                     setException(ex);
21                 }
22                 if (ran)
23                     /** 3.设置操作结果 */
24                     set(result);
25             }
26         } finally {
27             /** 状态重制*/
28             runner = null;
29             int s = state;
30             if (s >= INTERRUPTING)
31                 handlePossibleCancellationInterrupt(s);
32         }
33     }

逻辑比较清晰,首先是给FutureTask对象的runner赋值为当前线程Thread对象;然后直接执行Callable的call()方法执行;最好将call()执行结果通过set方法进行设置,具体如何设置就需要继续查看set方法逻辑如下,

 1  /** 设置操作结果*/
 2     protected void set(V v) {
 3         /** 1.设置FutureTask对象的state属性 */
 4         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 5             /** 2.赋值操作结果*/
 6             outcome = v;
 7             /** 3.通过CAS设置state为完成状态NORMAL */
 8             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
 9             /** 4.完成操作*/
10             finishCompletion();
11         }
12     }
13 
14     /** 完成操作*/
15     private void finishCompletion() {
16         /** 1.获取当前等待节点waiters赋值给q */
17         for (WaitNode q; (q = waiters) != null;) {
18             /** 2.死循环中判断当前waiters设置为null */
19             if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
20                 for (;;) {
21                     Thread t = q.thread;
22                     if (t != null) {
23                         q.thread = null;
/** 唤醒等待结果的线程*/
24 LockSupport.unpark(t); 25 } 26 WaitNode next = q.next; 27 if (next == null) 28 break; 29 q.next = null; // unlink to help gc 30 q = next; 31 } 32 break; 33 } 34 } 35 /** 2.任务完成,空实现,子类可用于扩展*/ 36 done(); 37 /** 3.重置callable属性 */ 38 callable = null; // to reduce footprint 39 }

核心就是通过CAS更新FutureTask的状态,然后将操作结果赋值的FutureTask的outcome属性用于存储操作结果,而最后如果想要获取操作结果,就可以通过FutureTask的get方法获取outcome的值即可,逻辑如下:

 1 public V get() throws InterruptedException, ExecutionException {
 2         int s = state;
 3         if (s <= COMPLETING)
 4             /** 如果当前状态不是已完成状态,那么就执行await进入等待状态,等待被唤醒*/
 5             s = awaitDone(false, 0L);
 6         return report(s);
 7     }
 8 
 9     private V report(int s) throws ExecutionException {
10         Object x = outcome;
11         if (s == NORMAL)
12             /** 返回操作结果*/
13             return (V)x;
14         if (s >= CANCELLED)
15             throw new CancellationException();
16         throw new ExecutionException((Throwable)x);
17     }

在获取结果时判断当前状态是否是已经完成,如果不是就需要调用awaitDone方法等待操作结果,实际是调用LockSupport的park方法进入等待状态,逻辑如下:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

总结:

1、FutureTask内部有Callable对象,FutureTask实现了Future和Runnable接口,执行run方法时会用调用run方法的线程执行Callable的call方法,并且将结果存入FutureTask的内部属性outcome中;

2、当不同线程调用FutureTask对象的get方法时,如果当前FutureTask状态已完成就直接返回结果;如果还未完成就通过LockSupport的park方法进行阻塞等待,多个线程等待就会封装成WaitNode节点对象组成链表结构;

3、当FutureTask任务执行完成之后,如果等待链表存在就会遍历链表中WaitNode节点,并依次通过LockSupport的unpark方法唤醒等待的线程,等待的线程被唤醒后就可以直接获取FutureTask的结果

4、FutureTask通常会和线程池配合使用,将FutureTask直接提交给线程池处理

原文地址:https://www.cnblogs.com/jackion5/p/15018991.html