《java.util.concurrent 包源码阅读》28 Phaser 第二部分

这一部分来分析Phaser关于线程等待的实现。所谓线程等待Phaser的当前phase结束并转到下一个phase的过程。Phaser提供了三个方法:

// 不可中断,没有超时的版本
public int awaitAdvance(int phase);

// 可以中断,没有超时的版本
public int awaitAdvanceInterruptibly(int phase);

// 可以中断,带有超时的版本
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);

这三个版本的方法的实现大体类似,区别在于第二个版本多了中断异常,第三个版本多了中断异常和超时异常。

    public int awaitAdvance(int phase) {
        // 获取当前state
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);

        // 检查给定的phase是否和当前的phase一直
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
    }

    // 多了一个对于中断的检查然后抛出中断异常
    public int awaitAdvanceInterruptibly(int phase)
        throws InterruptedException {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            // 使用QNode实现中断和超时,这里不带超时
            QNode node = new QNode(this, phase, true, false, 0L);
            p = root.internalAwaitAdvance(phase, node);
            // 对于中断的情况,抛出中断异常
            if (node.wasInterrupted)
                throw new InterruptedException();
        }
        return p;
    }

    // 多了中断异常和超时异常
    public int awaitAdvanceInterruptibly(int phase,
                                         long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        long nanos = unit.toNanos(timeout);
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, true, nanos);
            p = root.internalAwaitAdvance(phase, node);
            // 中断异常
            if (node.wasInterrupted)
                throw new InterruptedException();
            // 没有进入下一个phase,抛出超时异常
            else if (p == phase)
                throw new TimeoutException();
        }
        return p;
    }

上述三个方法都是调用了internalAwaitAdvance方法来实现等待,因此来看internalAwaitAdvance方法:

    private int internalAwaitAdvance(int phase, QNode node) {
        // 释放上一个phase的资源
        releaseWaiters(phase-1);

        // node是否被加入到队列中
        boolean queued = false;

        // 记录前一个Unarrived,用来增加spin值
        int lastUnarrived = 0;
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;

        // 循环操作直到phase值发生了变化
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            // 不可中断的模式,使用自旋等待
            if (node == null) {
                int unarrived = (int)s & UNARRIVED_MASK;
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                    spins += SPINS_PER_ARRIVAL;
                boolean interrupted = Thread.interrupted();
                // 发生了中断时,使用一个node来记录这个中断
                if (interrupted || --spins < 0) {
                    node = new QNode(this, phase, false, false, 0L);
                    node.wasInterrupted = interrupted;
                }
            }
            // 当前线程的node可以结束等待了,后面会分析isReleasible方法
            else if (node.isReleasable())
                break;
            // 把node加入到队列中
            else if (!queued) {
                // 根据phase值不同,使用不同的队列
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                QNode q = node.next = head.get();
                // 检查队列的phase是否和要求的phase一致并且Phaser的phase没有发生变化
                // 符合这两个条件才把node添加到队列中去
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase)
                    queued = head.compareAndSet(q, node);
            }
            // node加入队列后直接等待
            else {
                try {
            // 对于普通线程来说,这个方法作用就是循环直到isReleasable返回true
            // 或者block方法返回true ForkJoinPool.managedBlock(node); }
catch (InterruptedException ie) { node.wasInterrupted = true; } } } // 对于进入队列的node,重置一些属性 if (node != null) { // 释放thread,不要再使用unpark if (node.thread != null) node.thread = null; // 对于不可中断模式下发生的中断,清除中断状态 if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); // phase依旧没有变化表明同步过程被终止了 if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); } // 通知所有的等待线程 releaseWaiters(phase); return p; }

下面来看QNode,它实现了ManagedBlocker接口(见ForkJoinPool),ManagedBlocker包含两个方法:isReleasable和block。

isReleasable表示等待可以结束了,下面是QNode实现的isReleasable

        public boolean isReleasable() {
            // 没了等待线程,通常会在外部使用"node.thread = null"来释放等待线程,这时可以结束等待
            if (thread == null)
                return true;
            // phase发生变化,可以结束等待
            if (phaser.getPhase() != phase) {
                thread = null;
                return true;
            }

            // 可中断的情况下发生线程中断,可以结束等待
            if (Thread.interrupted())
                wasInterrupted = true;
            if (wasInterrupted && interruptible) {
                thread = null;
                return true;
            }

            // 设置超时的情况下,发生超时,可以结束等待
            if (timed) {
                if (nanos > 0L) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
                if (nanos <= 0L) {
                    thread = null;
                    return true;
                }
            }
            return false;
        }

最后来看QNode实现的block方法,核心思想是用LockSupport来实现线程等待:

        public boolean block() {
            if (isReleasable())
                return true;
            // 没有设置超时的情况
            else if (!timed)
                LockSupport.park(this);
            // 设置超时的情况
            else if (nanos > 0)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }

最后来看releaseWaiters方法,看看怎么释放node队列:

    private void releaseWaiters(int phase) {
        QNode q;
        Thread t;
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;

        // 如果phase已经发生了变化,才能释放
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            // 释放节点并转到下一个节点
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                // 释放线程
                q.thread = null;
                // 通知线程结束等待
                LockSupport.unpark(t);
            }
        }
    }

 到这里就把Phaser分析完了。

原文地址:https://www.cnblogs.com/wanly3643/p/3988575.html