LinkedBlockingQueue 源码分析

LinkedBlockingQueue

LinkedBlockingQueue 是基于链表实现的,可以选择有界或无界的阻塞队列。
队列的元素按照 FIFO 的顺序访问,新增元素添加到队列尾部,移除元素从队列头部开始。
队列头部通过 takeLock 进行并发控制,队列尾部通过 putLock 进行并发控制,
该队列最多可以有两个线程同时操作,其吞吐量要高于 ArrayBlockQueue。

创建实例

    /**
     *  单向链表节点
     */
    static class Node<E> {
        /**
         *  存储的元素
         */
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 阻塞队列的容量,如果不指定,则为 Integer.MAX_VALUE */
    private final int capacity;

    /** 当前的元素总数 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     *  链表头结点
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     *  链表尾节点
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** 执行读取操作时必须持有的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列为空时执行的 take 操作,当前线程将在此条件阻塞等待  */
    private final Condition notEmpty = takeLock.newCondition();

    /** 执行写入操作时必须持有的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列已满时执行的 put 操作,当前线程将在此条件阻塞等待 */
    private final Condition notFull = putLock.newCondition();

    /**
     *  创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     *  创建一个容量为 capacity 的 LinkedBlockingQueue 实例
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException();
        }
        this.capacity = capacity;
        last = head = new Node<>(null);
    }

写入元素

  • 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入
    /**
     *  在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入。
     */
    @Override
    public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 创建新节点
        final Node<E> node = new Node<>(e);
        // 读取写锁
        final ReentrantLock putLock = this.putLock;
        // 读取元素计数值
        final AtomicInteger count = this.count;
        // 可中断地锁定
        putLock.lockInterruptibly();
        try {
            // 如果当前队列已满
            while (count.get() == capacity) {
                // 则在非满条件上阻塞等待,线程被唤醒后进行重试
                notFull.await();
            }
            // 加入队列尾部
            enqueue(node);
            // 递增元素总数
            c = count.getAndIncrement();
            // 如果添加元素之后还有可用空间
            if (c + 1 < capacity) {
                // 唤醒在非满条件上阻塞等待的线程
                notFull.signal();
            }
        } finally {
            // 释放写锁
            putLock.unlock();
        }
        // 如果是添加的第一个元素
        if (c == 0) {
            // 唤醒在非空条件上阻塞等待的线程来读取元素
            signalNotEmpty();
        }
    }

    /**
     *  将节点插入队列尾部
     */
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    /**
     * Signals a waiting take. Called only from put/offer
     * 唤醒一个等待读取元素的线程
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
  • 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;如果队列已满,则立即返回 false,元素被丢弃。
    /**
     *  如果队列有可用空间,则将元素添加到队列尾部,并返回 true;
     *  如果队列已满,则立即返回 false,元素被丢弃。
     */
    @Override
    public boolean offer(E e) {
        if (e == null) {
            throw new NullPointerException();
        }
        final AtomicInteger count = this.count;
        // 队列已满
        if (count.get() == capacity) {
            // 返回 false
            return false;
        }
        int c = -1;
        // 新建节点
        final Node<E> node = new Node<>(e);
        // 读取写锁
        final ReentrantLock putLock = this.putLock;
        // 获取锁
        putLock.lock();
        try {
            // 进行二次判断
            if (count.get() < capacity) {
                // 将节点加入到队列尾部
                enqueue(node);
                // 递增元素总个数
                c = count.getAndIncrement();
                if (c + 1 < capacity) {
                    notFull.signal();
                }
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0) {
            signalNotEmpty();
        }
        return c >= 0;
    }
  • 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
    /**
     *  在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
     */
    @Override
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        // 转换为纳秒
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 队列已满
            while (count.get() == capacity) {
                // 已经超时则直接返回 false
                if (nanos <= 0L) {
                    return false;
                }
                // 最多阻塞等待指定的超时时间后再次尝试添加元素
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity) {
                notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0) {
            signalNotEmpty();
        }
        return true;
    }

读取元素

  • 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
    /**
     *  如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
     * created by ZXD at 6 Dec 2018 T 20:20:15
     * @return
     * @throws InterruptedException
     */
    @Override
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 队列为空
            while (count.get() == 0) {
                // 在非空条件上阻塞等待,线程被唤醒后再次尝试读取
                notEmpty.await();
            }
            // 移除队列头部元素
            x = dequeue();
            // 递减总数
            c = count.getAndDecrement();
            // 如果还有元素可用
            if (c > 1) {
                // 唤醒在非空条件上阻塞等待的线程
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 如果有可用空间
        if (c == capacity) {
            // 唤醒在非满条件上阻塞等待的线程来插入元素
            signalNotFull();
        }
        return x;
    }

    /**
     *  移除并返回头部节点元素
     */
    private E dequeue() {
        // 读取头节点
        final Node<E> h = head;
        // 读取后继节点
        final Node<E> first = h.next;
        // 清除旧头结点
        h.next = h; // help GC
        // 写入新头节点
        head = first;
        // 读取元素值
        final E x = first.item;
        // 清除头结点的元素值,它只作为一个标记节点
        first.item = null;
        // 返回元素值
        return x;
    }


  • 如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
    /**
     *  如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
     * created by ZXD at 6 Dec 2018 T 20:23:51
     * @return
     */
    @Override
    public E poll() {
        final AtomicInteger count = this.count;
        // 队列为空时直接返回 null
        if (count.get() == 0) {
            return null;
        }
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 有元素可用
            if (count.get() > 0) {
                // 移除并返回头部元素
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1) {
                    notEmpty.signal();
                }
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity) {
            signalNotFull();
        }
        return x;
    }
  • 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
    /**
     *  尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
     * created by ZXD at 6 Dec 2018 T 20:27:27
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 队列为空
            while (count.get() == 0) {
                // 已经超时则直接返回 null
                if (nanos <= 0L) {
                    return null;
                }
                // 在非空条件上阻塞等待指定的纳秒数,被唤醒后再次进行读取
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1) {
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity) {
            signalNotFull();
        }
        return x;
    }
原文地址:https://www.cnblogs.com/zhuxudong/p/10079026.html