并发编程(四)TaskFuture

并发编程(四)TaskFuture

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Object> future = executorService.submit(() -> {
    TimeUnit.SECONDS.sleep(5);
    return 5;
});
Object result = future.get();

ExecutorService 异步执行任务返回一个 Future,本节重点分析 Future 的 get 方法是如何拿到返回结果的呢?

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

下面我们重点分析 FutureTask 类

FutureTask

一、基本变量

(1) 核心成员变量

// 1. 执行的回调方法。如果是 Runnable 就通过 Executors#callable 包装成一个 Callable
private Callable<V> callable;

// 2. 保存计算结果或者异常信息。non-volatile, protected by state reads/writes
private Object outcome;

// 3. 执行 callable 的线程,run 方法中通过 CAS 保证原子性操作
private volatile Thread runner;

// 4. 等待结果的线程队列,eg: 不同的线程同时调用 get()
//    这个队列使用 Treiber stack(可以理解为基于 CAS 的无锁的栈,先进后出)
private volatile WaitNode waiters;

(2) 状态变化

/* 
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

任务执行正常结束前,state 会被设置成 COMPLETING,代表任务即将完成,接下来很快就会被设置为 NARMAL 或者 EXCEPTIONAL,这取决于调用 Runnable 中的 call() 方法是否抛出了异常。有异常则后者,反之前者。

任务提交后、任务结束前取消任务,那么有可能变为 CANCELLED 或者 INTERRUPTED。在调用 cancel 方法时,如果传入 false 表示不中断线程,state 会被置为 CANCELLED,反之 state 先被变为 INTERRUPTING,后变为 INTERRUPTED。

总结下,FutureTask 的状态流转过程,可以出现以下四种情况:

  1. 任务正常执行并返回。 NEW -> COMPLETING -> NORMAL
  2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
  3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
  4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED 

补充:Unsafe

Unsafe 是 JDK 底层的类库,位于 sun.misc.Unsafe 中,在 java.util.concurrent 广泛使用。

private static final UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state"));

// 更新 state 状态
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)

二、run

/**
 * run 方法执行有两个条件:1. state=NEW; 2. runner=null
 * 1. 执行前 state=NEW & runner=null
 * 2. 执行中 state=NEW & runner=Thread.currentThread()
 * 3. 执行后 state!=NEW & runner=null,根据是否有异常执行 set(result) 或 setException(ex),无论执行成功与否都会更新 state 状态
 * 因此,多个线程同时调用 run 方法的情况 callable 也只会执行一次
 */
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // set 方法会回调钩子方法 done(),可能抛出异常
            if (ran)
                set(result);
        }
    } finally {
        runner = null;

        // 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    // 通过 CAS 状态来确认计算没有被取消,而且线程只执行了一次
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        // 必须将栈顶 CAS 为 null,否则重读栈顶并重试。
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 遍历并唤醒栈中等待的线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                // 将 next 域置为 null,这样对 GC 友好
                q.next = null; 
                q = next;
            }
            break;
        }
    }

    /*
     * done 方法是暴露给子类的一个钩子方法。
     * 这个方法在 ExecutorCompletionService.QueueingFuture 中的 override 实现是把结果加到阻塞队列里。
     */
    done();

    callable = null;
}

private void handlePossibleCancellationInterrupt(int s) {
    /*
     * 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
     *
     * 实际上,当前调用 cancel 方法的线程不一定能够中断到本线程。
     * 有可能 cancel 方法里读到 runner 是 null,甚至有可能是其它并发调用 run/runAndReset 方法的线程。
     * 但是也没办法判断另一个线程在 cancel 方法中读到的 runner 到底是什么,所以索性自旋让出 CPU 时间片也没事。
     */
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield();
}

三、get

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果线程已经执行完成直接返回
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

/**
 * 等待任务执行完毕,如果任务取消或者超时则停止
 * @param timed 为 true 表示设置超时时间
 * @param nanos 超时时间
 * @return 任务完成时的状态
 * @throws InterruptedException
 */
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 1. callable 已执行完成,无论成功或失败直接返回执行结果
        if (s > COMPLETING) {
            // 已执行完,为了 GC 需要清 q.thread
            if (q != null)
                q.thread = null;
            return s;
        }
        // 2. COMPLETING 是一个很短暂的状态,调用 Thread.yield 期望让出时间片,之后重试循环
        else if (s == COMPLETING)
            Thread.yield();
        // 3. 初始化节点,重试一次循环
        else if (q == null)
            q = new WaitNode();
        // 4. queued 记录是否已经入栈,此处准备将节点压栈
        else if (!queued)
            /*
             *  这是 Treiber Stack 算法入栈的逻辑。
             *  Treiber Stack 是一个基于 CAS 的无锁并发栈实现
             *  更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
             */
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        // 5. 如果有时限,判断是否超时,未超时则park剩下的时间。
        else if (timed) {
            nanos = deadline - System.nanoTime();
            // 超时,移除栈中节点
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

/**
 * 清理用于保存等待线程栈里的无效节点,所谓节点无效就是内部的 thread 为 null(类比 ThreadLocalMap)
 *   
 * 一般有以下几种情况:
 * 1. 节点调用 get 超时。
 * 2. 节点调用 get 中断。
 * 3. 节点调用 get 拿到 task 的状态值(> COMPLETING)。
 *
 * 此方法干了两件事情:
 * 1. 置标记参数 node 的 thread 为 null
 * 2. 清理栈中的无效节点
 *
 * 如果在遍历过程中发现有竞争则重新遍历栈。
 */
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            // pre -> current -> next,如果 current 无效就把 pre.next=next
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // 1. 如果当前节点仍有效,则置 pred 为当前节点,继续遍历
                if (q.thread != null)
                    pred = q;

                // 2. 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
                //    如果前驱节点已无效,则重新遍历 waiters 栈。
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null)
                        continue retry;
                }
                // 3. 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
                //    失败的话重新遍历 waiters 栈。
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

/**
 * 导出结果。
 */
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 1. 正常执行完计算任务
    if (s == NORMAL)
        return (V)x;
    // 2. 取消
    if (s >= CANCELLED)
        throw new CancellationException();
    // 3. 执行计算任务时发生异常
    throw new ExecutionException((Throwable)x);
}

四、cancal

/**
 * mayInterruptIfRunning=false 时,不允许在线程运行时中断,设成 true 的话就允许但不保证一定会中断线程。
 * 1. true 时,将状态修改成 INTERRUPTING,执行 thread.interrupt()
 * 2. false 时,将状态修改成 CANCELLED
 */
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

参考:

  1. 《FutureTask 源码解读》:http://www.cnblogs.com/micrari/p/7374513.html

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/10068923.html