FutureTask源码分析

FutureTask:表示的是异步计算的未来结果,本质就是多线程执行并拿到结果。
get方法是阻塞的
NEW表示是FutureTask刚刚创建好,是一个新的任务或者是还没有执行的任务,是初始状态。

FutureTask只能执行一次,一次过后需要重新创建,因为futureTask的状态会停留到最后一步,所以第二次执行到run方法的时候会被过滤掉。
FutureTask同时只能一个线程执行,在执行run方法的时候会被过滤掉。

FutureTask实现了RunnableFuture接口

其中包含Future接口

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);//用于取消异步执行,不一定成功。
    boolean isCancelled();//判断任务是否取消,true表示已经取消,false表示没有取消
    boolean isDone();//判断任务是否已经执行完成,true表示完成,false表示没有完成
    V get() throws InterruptedException, ExecutionException;//获取异步执行的结果
    V get(long timeout, TimeUnit unit)//获取异步执行的结果,这里是带超时时间的。
        throws InterruptedException, ExecutionException, TimeoutException;
}

构造函数

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;//初始状态
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);//通过工具类进行封装,把Runnable转为Callable
    this.state = NEW;//初始状态
}

FutureTask是有多种状态的

private static final int NEW          = 0;//初始状态,在还没执行完成都是。
private static final int COMPLETING   = 1;//call方法已经调用结束了,在调用set或者setException方法,但是还没有赋值给outcome。
private static final int NORMAL       = 2;//正常执行完成,是调用set方法。
private static final int EXCEPTIONAL  = 3;//异常执行完成,是调用的setException方法。
private static final int CANCELLED    = 4;//调用了cancel(false)后的状态,是最终态。
private static final int INTERRUPTING = 5;//调用了cancel(true)后的状态,是中间态。
private static final int INTERRUPTED  = 6;//调用cancel的最终状态。

run方法的执行逻辑

public void run() {
    //判断当前线程是不是初始状态,这样可以保证执行过的线程会过滤掉。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))//判断runner是否为null,这样可以保证同时只能一个线程执行。    
        return;
    try {
        //创建FutureTask时传入进来了
        Callable<V> c = callable;
        //初始状态且不为null
        if (c != null && state == NEW) {
            V result;
            //用于判断任务是否正确执行
            boolean ran;
            try {
                //执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);//异常赋值
            }
            if (ran)
                set(result);//正常赋值
        }
    } finally {
      //在状态成为最终态时要保证runner非空,因为这样可以避免并发调用run。 runner
= null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

set和setException方法

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //指定最终状态是NORMAL
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //指定最终状态是EXCEPTIONAL
        finishCompletion();
    }
}

finishCompletion方法

//唤醒所有等待的线程
//这些等待的线程是在调用get时产生的。
//相当于在这里控制等待的线程get到结果。
private void finishCompletion() {
    //唤醒所有等待的线程
    for (WaitNode q; (q = waiters) != null;) {//等待的线程
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);//唤醒等待的线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

get方法

//get方法是阻塞的
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)//如果没有执行完的
        s = awaitDone(false, 0L);
    return report(s);//返回结果
}

report方法

//获取当前的值
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)//正常状态的结果
        return (V)x;
    if (s >= CANCELLED)//取消的结果
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);//异常结束的结果
}

awaitDone方法

//当前线程阻塞的逻辑:  timed是否设置等待时间,nanos具体的等待时间
//进入该方法的时候状态肯定是<=COMPLETING的,也就是NEW(执行未完成)和COMPLETING(执行完成未赋值)
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {//通过for死循环不断验证状态
        if (Thread.interrupted()) {//如果当前线程是被打断的,那么会从waiters中移除并抛出异常。
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {//run运行结束后,或者通过cancel调用导致状态变更。
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // run正在进行时,call方法已经执行了,还没有执行到赋值那一步
            Thread.yield();
        else if (q == null)//新创建等待节点
            q = new WaitNode();
        else if (!queued)//还没有入队列就把q放到waiters的头结点
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {//指定等待时间的情况下
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {//已经到时了
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);//线程阻塞
        }
        else
            LockSupport.park(this);//当前线程进行阻塞
    }
}

cancel方法

//是否取消,如果为true,会打断run的线程,否则只是状态标记为CANCELLED
////该方法不会抛出异常的
public boolean cancel(boolean mayInterruptIfRunning) { //状态不为new的时候,也就是到赋值阶段是不允许cancel的,基本上就是在run方法执行时可以。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion();//cancel会把等待线程都唤醒,然后结束。 } return true; }

isCancelled方法和isDone方法

//判断是否被取消
public boolean isCancelled() {
    return state >= CANCELLED;//通过状态判断
}

//是否已经完成,也就是执行完run方法,或者被取消
public boolean isDone() {
    return state != NEW;//通过状态判断
}
原文地址:https://www.cnblogs.com/tp123/p/12093223.html