ConcurrentLinkedQueue 源码分析

ConcurrentLinkedQueue

ConcurrentLinkedQueue 能解决什么问题?什么时候使用 ConcurrentLinkedQueue?

1)ConcurrentLinkedQueue 是基于单向链表实现的线程安全【基于 CAS 实现】的、无界、FIFO、非阻塞队列。
2)ConcurrentLinkedQueue 的 offer 和 poll 操作都是非阻塞的。

如何使用 ConcurrentLinkedQueue?

1)ConcurrentLinkedQueue 并发性能比 LinkedBlockingQueue 高,但是当无元素可用时,
频繁的自旋拉取会导致 CPU 飙升,所以当消费者没有拉取到元素时,建议休眠指定的时间后再重试。

使用 ConcurrentLinkedQueue 有什么风险?

1)由于是无界非阻塞队列,当生产速率持续大于消费速率时,会导致资源耗尽,内存溢出。
2)极高的并发场景下,自旋 CAS 长时间不成功会给 CPU 带来非常大的执行开销。

ConcurrentLinkedQueue 核心操作的实现原理?

创建实例

    static final class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         *  创建一个持有 item 的新节点
         */
        Node(E item) {
            ConcurrentLinkedQueue.ITEM.set(this, item);
        }

        /** Constructs a dead dummy node. */
        Node() {}

        // 写入后置节点
        void appendRelaxed(Node<E> next) {
            ConcurrentLinkedQueue.NEXT.set(this, next);
        }
        
        // 尝试原子更新 item
        boolean casItem(E cmp, E val) {
            return ConcurrentLinkedQueue.ITEM.compareAndSet(this, cmp, val);
        }
    }

    /**
     *  队列的第一个活性节点
     * Invariants:
     * - all live nodes are reachable from head via succ()
     *   所有的节点都可以从 head 开始,通过递归 succ() 到达
     * - head != null
     *   head 节点不为 null
     * - (tmp = head).next != tmp || tmp != head
     *   不会出现自连接
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     *    最后一个节点总是可以从 tail 递归 succ() 方法到达
     * - tail != null
     *   tail 节点不为 null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-linked.
     */
    private transient volatile Node<E> tail;

    /**
     *  创建一个空的 ConcurrentLinkedQueue 实例
     */
    public ConcurrentLinkedQueue() {
        head = tail = new Node<>();
    }

插入元素

    /**
     *  将元素 e 插入到队列尾部,由于是无界队列,该操作不会被阻塞 && 返回值永远是 true
     */
    @Override
    public boolean offer(E e) {
        // 元素值不允许为 null
        final Node<E> newNode = new Node<>(Objects.requireNonNull(e));
        /**
         * t:tail
         * p:predecessor
         * 1)读取尾节点
         */
        for (Node<E> t = tail, p = t;;) {
            // 读取尾节点的后置节点
            final Node<E> q = p.next;
            // 1)当前尾节点为最后一个节点
            if (q == null) {
                // p is last node
                // 则尝试将新节点链接到其后面
                if (ConcurrentLinkedQueue.NEXT.compareAndSet(p, null, newNode)) {
                    /**
                     *  成功的 CAS 操作使得元素 e 成为队列的节点。
                     */
                    if (p != t) {
                        // 如果其他线程并发修改了 tail 节点,则尝试更新 tail 节点。
                        ConcurrentLinkedQueue.TAIL.weakCompareAndSet(this, t, newNode);
                    }
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 2)tail 节点和最后一个节点间隔为 2
            else if (p == q) {
                /**
                 * We have fallen off list.  If tail is unchanged, it will also be off-list,
                 * in which case we need to jump to head, from which all live nodes are always reachable.
                 * Else the new tail is a better bet.
                 * 1)如果尾节点未改变,则读取头节点
                 * 2)否则读取新的尾节点
                 */
                p = t != (t = tail) ? t : head;
                // 3)如果滞后了两个节点,则重新读取 tail 节点
            } else {
                /**
                 *  Check for tail updates after two hops.
                 * 1)tail 节点和最后一个节点间隔为 2,并且尾部节点已经更新,则读取新的尾部节点
                 * 2)否则更新 p 为其后置节点 q
                 */
                p = p != t && t != (t = tail) ? t : q;
            }
        }
    }

拉取元素

    /**
     *  移除并返回队列头部元素,如果队列为空,则返回 null
     * created by ZXD at 8 Dec 2018 T 17:12:16
     * @return
     */
    @Override
    public E poll() {
        restartFromHead: for (;;) {
            // 读取头节点
            for (Node<E> h = head, p = h, q;; p = q) {
                final E item;
                // 1)head 节点元素不为 null,则尝试将其更新为 null,此时的 head 不是傀儡节点。
                if ((item = p.item) != null && p.casItem(item, null)) {
                    /**
                     *  Successful CAS is the linearization point for item to be removed from this queue。
                     *  成功的 CAS 操作说明我们已经移除了队列头部元素
                     */
                    if (p != h) {
                        // 头节点是哨兵节点,则一次移动两个位置,新头节点为数据节点
                        updateHead(h, (q = p.next) != null ? q : p);
                    }
                    // 返回目标元素
                    return item;
                }
                // 后置节点为 null
                else if ((q = p.next) == null) {
                    // 将头结点 h 更新为 p
                    updateHead(h, p);
                    return null;
                }
                else if (p == q) {
                    continue restartFromHead;
                }
            }
        }
    }
原文地址:https://www.cnblogs.com/zhuxudong/p/10088394.html