LinkedBlockingQueue

  LinkecBlockQueue链表阻塞队列,从命名可以看出,它是基于链表实现的。同样这也是个先进先出的队列,队头是队列里入队时间最长的元素,队尾则是入队时间最短的。理论上它的吞吐量要超出数组阻塞队列ArrayBlockingQueue。LinkedBlockQueue可以指定容量限制,在没有指定的情况下,默认为Integer.MAX_VALUE

1, 成员变量

 1 //最大容量
 2 private final int capacity;
 3 //当前队列长度
 4 pirvate final AtomicInteger count = new AtomaticInteger();
 5 //头结点,用于标志队列头,其item永远为null,head.next为第一个入队节点
 6 transient Node<E> head;
 7 //尾结点
 8 private transient Node<E> last;
 9 //取值锁,用于take/poll方法
10 private final ReentrantLock takeLock = new ReentrantLock();
11 //表示队列非空的对象监视器
12 private final Condition notEmpty = takeLock.newCondition();
13 //存值锁,用于put、offer等方法。
14 private final ReentrantLock putLock = new ReentrantLock();
15 //表示队列非满的对象监视器
16 private final Condition notFull = putLock.newCondition();

  与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应应存值和取值。这种实现方法被称为双锁队列算法,这样做的好处在于,读写操作的lock操作是由两个锁来控制,互不干涉,因此可以同时进行读操作和写操作,这也是LinkedBlockQueue吞吐量超过ArrayBlockingQueue的原因。但是,使用两个锁要比一个锁复杂很多,需要考虑各种死锁的情况。      

2, signalNotEmpty()和signalNotFull()

  notEmpty/notFull分别对应非空和非满锁的条件监视器,signalNotEmpty()和signalNotFull()方法分别负责唤醒对应的入队、出队线程:

 1 private void signalNotEmpty() {
 2     final ReentrantLock takeLock = this.takeLock;
 3     takeLock.lock();
 4     try {
 5         notEmpty.signal();
 6     } finally {
 7         takeLock.unlock();
 8     }
 9 }
10 private void signalNotFull() {
11     final ReentrantLock putLock = this.putLock;
12     putLock.lock();
13     try {
14         notFull.signal();
15     } finally {
16         putLoct.unlock();
17     }
18 }

       LinkedBlockingQueue使用双锁算法来实现,在需要唤醒重入锁的时候,重入锁与监视器可能不是对应的,以put(E)方法为例,存值方法在执行完成后,如果队列内有值存在,那么需要对notEmpty进行唤醒,但是put方法明显使用putLock进行加锁的,而notEmpty则是用来监视takeLock,所以需要封装signal方法,以便调用。

3, add、put、offer方法

  LinkedBlockingQueue提供的存值方法中,并未实现add方法,该方法由父类AbstractQueue实现,父类的add方法直接调用了offer方法,并在offer返回false时抛出异常。

put方法为入队方法,如果队列已满,那么阻塞当前线程,直到notFull被唤醒:

 1 public void put(E e) throws InerruptedException {
 2     if(e == null) throw new NulPointerException();
 3     int c = -1;
 4     Node<E> node = new Node<E>(e);
 5     final ReentrantLock putLock = this.putLock;
 6     final AtomicInteger count = this.count;
 7     putLock.lockInterruptibly();
 8     try {
 9         //队列已满情况下,阻塞当前线程
10         while(count.get() == capacity) {
11             notFull.await();
12         }
13         enqueue(node);
14         c = count.getAndIncrement();
15         //c+1小于容量,说明可以通知等待队列非满状态的对象监视器
16         if(c + 1 < capacity) {
17             notFull.signal();
18         }
19     } finally {
20         putLock.unlock();
21     }
22     //c == 0意味着最新的count是由0变化为1,需要通知等待队列非空状态的线程
23     if(c == 0) {
24         signalNotEmpty();
25     }
26 }

  offer(E)方法,尝试入队一次,如果失败,返回false,offer和put的唯一区别在于offer会判断count是否达到容量,也就是判断队列是否已满,队满则不再执行入队操作。offer(E,long,TimeUnit)与put非常相似,会不停的尝试入队,区别在于使用了awaitNanos(long)方法而不是await()。

4, remove、take、poll方法

remove用于移除指定对象

 1 public boolean remove(Object o) {
 2     if(o == null) return false;
 3     fullyLock();
 4     try {
 5         for(Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
 6             if(o.equals(p.item)) {
 7                 unlink(p, tail);
 8                 return true;
 9             }
10         }
11         return false;
12     } finally {
13         fullyUnlock();
14     }
15 }
16 void fullyLock() {
17     putLock.lock();
18     takeLock.lock();
19 }
20 void fullyUnlock() {
21     takeLock.unlock();
22     putLock.unlock();
23 }

  fullLock和fullyUnlock方法会对putLock和takeLock统一上锁或者解锁,因为LinkedBlockingQueue是一个双向链表,remove可能会同时影响到入队和出队操作。移除指定节点的原理是从头开始遍历,通过equals来确定是否一致。

       take方法会不停地尝试从队头出队

 1 public E take() throws InterruptedException {
 2     E x;
 3     int c = -1;
 4     final AtomicInteger count = this.count;
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lockInterruptibly();
 7     try {
 8         //队列已空的情况下,阻塞当前线程
 9         while(count.get() == 0) {
10             notEmpty.await();
11         }
12         x = dequeue();
13         c = count.getAndDecrement();
14         if(c > 1) {
15             notEmpty.signal();
16         }
17     } finally {
18         takeLock.unlock();
19     }
20     //意味着刚刚从队满状态脱离需要唤醒等待notFull状态的线程
21     if(c == capacity) {
22         signalNotFull();
23     }
24     return x;
25 }

poll()方法用于从队尾出队,与take()的区别是他只会尝试一次,失败返回null。poll(long, TimeUnit)有超时的poll,则会不停的请求出队。

 1 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 2     E x = null;
 3     int c = -1;
 4     long nanos = unit.toNanos(timeout);
 5     final AtomicInteger count = this.count;
 6     final ReentrantLock takeLock = this.takeLock;
 7     takeLock.lockInterruptibly();
 8     try {
 9         while (count.get() == 0) {
10             if(nanos <= 0) {
11                 return null;
12             }
13             nanos = notEmpty.awaitNanos(nanos);
14         }
15         x = dequene();
16         c = count.getAndDecrement();
17         if(c > 1) {
18             notEmpty.signal();
19         }
20     } finally {
21         takeLock.unlock();
22     }
23     if(c == capacity) {
24         signalNotFull();
25     }
26     return x;
27 }

       Poll(long, TimeUnit)的实现和take()基本一致,不同之处在于notEmpty对象监视器的await方法换成了带超时的awaitNanos方法,这将使poll(long, TimeUnit)阻塞给定时间,直到出队成功或者抛出中断异常。

与ArrayBlockingQueue相比,LinkedBlockingQueue的实现更为复杂,但深究其实现,遵循以下原则:

       入队后,如果队列未满,那么唤醒下一入队线程,如果队列原本是空队列,那么唤醒出队线程。

       出队后,如果队列未空,那么唤醒下一出队线程,如果队列原本是满队列,那么唤醒入队线程。

原文地址:https://www.cnblogs.com/guanghe/p/13488640.html