深入理解java:2.3.5. 并发编程concurrent包 之容器BlockingQueue(阻塞队列)

1. 什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。

这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。

当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。

阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

add        增加一个元索                     如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove   移除并返回队列头部的元素    如果队列为空,则抛出一个NoSuchElementException异常
element  返回队列头部的元素             如果队列为空,则抛出一个NoSuchElementException异常
offer       添加一个元素并返回true       如果队列已满,则返回false
poll         移除并返问队列头部的元素    如果队列为空,则返回null
peek       返回队列头部的元素             如果队列为空,则返回null
put         添加一个元素                      如果队列满,则阻塞
take        移除并返回队列头部的元素     如果队列为空,则阻塞

remove、element、offer 、poll、peek 其实是属于Queue接口。

方法处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

2. Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,其构造函数必须带一个int参数来指明其大小。
  • LinkedBlockingQueue :若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。
  • PriorityBlockingQueue :一个支持优先级排序无界阻塞队列,其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,本身不存在容量
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

上点源码 就更明白了。以ArrayBlockingQueue类为例: 

先看阻塞的方法,这ReentrantLock的使用方式就能说明这个类是线程安全类
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//队列已满,返回false
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)//队列为空,返回false
                return false;
            Else{
                x = extract();
                return true;
           }
        } finally {
            lock.unlock();
        }
    }
 
 
再看阻塞的方法,这里面涉及到Condition类,简要提一下, 
await方法指:造成当前线程在接到信号或被中断之前一直处于等待状态。 
signal方法指:唤醒一个等待线程。
用一个ReentrantLock ,两个Condition。实现了阻塞算法。

private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)//如果队列已满,等待notFull这个条件,这时当前线程被阻塞
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); //唤醒受notFull阻塞的当前线程
                throw ie;
            }
            insert(e);//insert方法里notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)//如果队列为空,等待notEmpty这个条件,这时当前线程被阻塞
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal();//唤醒受notEmpty阻塞的当前线程
                throw ie;
            }
            return extract();//extract方法里notFull.signal();
        } finally {
            lock.unlock();
        }
    }
 
 
LinkedBlockingQueue相对于ArrayBlockingQueue还有不同的是,
有两个ReentrantLock,且队列现有元素的大小由一个AtomicInteger对象标示。 
有两个Condition很好理解,在ArrayBlockingQueue也是这样做的。但是为什么需要两个ReentrantLock呢?
 
入队操作其实操作的只有队尾引用last,并且没有牵涉到head。
而出队操作其实只针对head,和last没有关系。
那么就 是说入队和出队的操作完全不需要公用一把锁,所以就设计了两个锁,这样就实现了多个不同任务的线程入队的同时可以进行出队的操作,
锁分段技术提高性能,如ConcurrentHashMap性能优于HashTable。
 
另一方面由于两个操作所 共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。
原文地址:https://www.cnblogs.com/my376908915/p/6759941.html