ArrayBlockingQueue 和LinkedBlockQueue

ArrayBlockingQueue

ArrayBlockingQueue是Java多线程常用的线程安全的一个集合,基于数组实现,继承自AbstractQueue,实现了BlockingQueue和Serializable接口。
//先看看器内部的成员变量:

private static final long serialVersionUID = -817911632652898426L;//实现了序列化接口

/** 基于数组的实现,内部持有一个Object数组 */
final Object[] items;

/** 数据读取指针 */
int takeIndex;

/** 数据插入指针 */
int putIndex;

/** 当前队列中元素的总数 */
int count;

/** 采用了ReentrantLock 的实现 */
final ReentrantLock lock;

/** 标识当前队列中有可读元素 */
private final Condition notEmpty;

/** 标识当前队列可写入 */
private final Condition notFull;

//可以看到,ArrayBlockingQueue内部维护了一个takeIndex指针和一个putIndex指针,分别用于读取和写入;一个notEmpty和一个notFull,分别用于保证写入和读取的线程安全,唤醒读取和写入线程
//再看看构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];//初始化数组
    lock = new ReentrantLock(fair);//初始化ReentrantLock,并标识是否为公平锁
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

//然后来看看ArrayBlockingQueue的offer方法

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            //如果队列满,则添加失败。offer方法不会阻塞,put方法会阻塞
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
//首先做空值检查,如果为空,抛出空值异常。然后使用了ReentrantLock ,来保证offer的线程安全性。下面来看看真正的添加方法enqueue:
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
//可以看到,ArrayBlockingQueue内部维护了一个putIndex 指针,该指针指向当前队列可以插入的位置,直接将当前的Object对象插入到inputIndex位置,然后让inputIndex自增,如果队列已满,则指向第一个元素。最后元素总数加一,并唤醒读线程
//最后我们来看读取take方法:
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();//take方法是阻塞的,poll方法不会阻塞,直接返回。
        return dequeue();
    } finally {
        lock.unlock();
    }
}
//可以看到,那么take方法将被阻塞。下面看看出对方法dequeue:
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;//如果取到最后一个元素,takeIndex 指向第一个元素
    count--;//元素总数减一
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();//唤醒写入线程
    return x;
}
以上便是ArrayBlockingQueue的基本方法,内部锁的实现是ReentrantLock ,维护了take和put两个指针;入队和出对方法也都挺简单的,需要注意的是,take和put方法是阻塞的,offer、add、poll等方法是非阻塞的

LinkedBlockingQueue

LinkedBlockingQueue基于链表实现,继承了AbstractQueue,实现了序列化接口Serializable和BlockingQueue接口
 //首先看看内部成员变量:

private final int capacity;

/** count用来记录内部元素的总数 */
private final AtomicInteger count = new AtomicInteger();

/** Node节点的头指针*/
transient Node<E> head;

/** 尾指针*/
private transient Node<E> last;

/** 读锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** 写锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
可以看到,与ArrayBlockQueue不同,元素总数使用了原子类AtomicInteger ,内部多维护了两把锁,读锁和写锁。其实现相对更加复杂
//下面看看其构造方法
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();//容量不能小于0
    this.capacity = capacity;
    last = head = new Node<E>(null);//初始化头尾指针
}
//下面是offer方法
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();//不接受空值
    final AtomicInteger count = this.count;
    if (count.get() == capacity)//如果当前元素总数等于其容量大小,直接返回false
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
//我们可以看到,LingkedBlockQueue是不接受空值的。offer是非阻塞的。入队之后,如果队列没有满,唤醒其他入队线程,并且唤醒出队线程。
//继续看入队方法enqueue
private void enqueue(Node<E> node) {
    last = last.next = node;
}//可以看到入队方法相当简单,就是把尾节点的下一个节点直接指向新加入的节点,然后将新加入的节点作为尾节点

//然后看看take方法:
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();//take方法是阻塞的
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}//也挺简单的,就是先判断是否可以出队,不能则等待,否则出队,然后唤醒其他出队线程,并唤醒入队线程
//最后是出队方法:
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
} // 直接将一个元素取出,然后首位元素置空
总结,从实现来看,相比ArrayBlockQueue,LinkedBlockQueue的加锁方法相对更加复杂,但是其入队和出队方法更加简单;和ArrayBlockQueue一样,take、put方法阻塞,offer、add、poll方法不会阻塞
原文地址:https://www.cnblogs.com/canmeng-cn/p/8711182.html