ArrayBlockingQueue源码解析

ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组存储实现的有界阻塞队列,新增、获取、移除元素由内部持有的重入锁控制。

重入锁支持公平和非公平模式,默认使用非公平模式。

如果新增移除是阻塞的,那么新增时如果队列满了,会在notFull上等待,移除元素时如果队列空了,会在notEmpty上等待。

ArrayBlockingQueue提供了读指针takeIndex和写指针putIndex控制着队列元素的新增和移除。当移除元素之后takeIndex向前移动,当新增元素之后putIndex向后移动,当指针移动到数组尾部,指针置为0,继续移动。

count = items.length队列满,count = 0队列空。

关键属性

// 底层存储元素数组
final Object[] items;
// 当前移除元素指针
int takeIndex;
// 当前添加元素位置
int putIndex;
// 总元素个数
int count;
// 并发控制锁
final ReentrantLock lock;
// 非空条件,阻塞获取元素线程
private final Condition notEmpty;
// 非满条件,阻塞添加元素线程
private final Condition notFull;

构造函数

/**
 * 指定队列容量,使用非公平锁
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

/**
 * 指定队列容量,fair=false
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    // 初始化重入锁及等待条件
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

添加元素

ArrayBlockingQueue提供了addofferput方法用于新增元素。add在队列满时抛出异常;

offer在队列满时立即返回;put队列满时在notFull上等待

/**
 * offer遇到队列满时立即返回false
 */
public boolean offer(E e) {
    checkNotNull(e);
    // 添加元素时上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 队列满,立即返回
        if (count == items.length)
            return false;
        // 元素入队,插入元素后新增元素指针(putIndex)向前移动一位,唤醒非空条件上等待的线程
        // 当putIndex=items.length,将putIndex设为0,即数组将会形成一个环形
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
/**
 * add遇到队列满时立即抛出异常
 */
public boolean add(E e) {
    return super.add(e);
}

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

/**
 * 元素入队,将新增元素放入putIndex位,唤醒在非空条件上等到的线程
 * 如果已经到数组末尾,将putIndex置为0
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
/**
 * put新增元素
 * 1.如果队列已满,线程进入非满条件等待
 * 2.队列未满,元素入队,并唤醒在非空上等待的元素
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

移除元素

/**
 * poll移除元素
 * 当队列空时,立即返回null
 * 队列不空时,移除takenIndex位置的元素
 * 1、如果takeIndex=items.length时,takeIndex置为0
 * 2、唤醒在非空上等待的线程
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 队列空,返回null,窦泽移除takeIndex位置元素
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
// 移除takeIndex位置元素
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 获取takeIndex位置的元素,并置空
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 已经到了数组尾部,takeIndex置为0,从头开始
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒在非满上等待的线程
    notFull.signal();
    return x;
}
/**
 * take移除元素
 * 如果队列空,线程在非空条件上等待
 * 如果队列不空,移除takeIndex位置的元素,并唤醒在非满条件上等待的线程
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 队列空,当前线程在非空条件上等待
        while (count == 0)
            notEmpty.await();
        // 队列不空,移除takeIndex位置上的元素
        return dequeue();
    } finally {
        lock.unlock();
    }
}

移除指定元素

/**
 * 移除指定元素
 * 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
 */
public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
            do {
                // 存在相等的元素,移除,返回true
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

/**
 * 移除指定位置的元素
 * 1、如果移除的元素是队尾,直接置空takeIndex位置元素
 * 2、如果移除的元素不是队尾,从removeIndex开始将后一个元素向前移动,直到putIndex
 */
void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    // 移除的是队尾元素,直接将takeIndex位置置空,takeIndex前移一位
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } 
    // 移除的不是队尾元素,则从removeIndex开始将后一个元素向前移动,直到putIndex
    else {
        // an "interior" remove

        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        // 从removeIndex开始遍历
        for (int i = removeIndex;;) {
            // 后一个元素
            int next = i + 1;
            // 移动到数组尾部,调到数组头部
            if (next == items.length)
                next = 0;
            // 还没到putIndex位置,将元素前移一位
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } 
            // 到了putIndex,所有元素已经前移完成,putIndex设置为i,即putIndex也前移一位,因为前一位元素已经被移除
            else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
       	// 总元素-1
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    // 唤醒非满条件上等待的线程
    notFull.signal();
}

获取队首元素

/**
 * peek只获取队首元素,不会移除元素
 */ 
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 返回takeIndex位置的元素
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

清空队列

/**
 * 清空队列
 * 1、遍历置空数组元素位
 * 2、唤醒在非满条件上等待的线程
 */
public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 遍历置空数组元素位
            do {
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;
            count = 0;
            if (itrs != null)
                itrs.queueIsEmpty();
            // 唤醒在非满条件上等待的线程
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}
原文地址:https://www.cnblogs.com/QullLee/p/12319161.html