ArrayBlockingQuque摘要

ArrayBlockingQuque

优势

  1. 线程同步,线程安全
  2. 对应空或满时,takeput操作将阻塞
  3. 内部是一个数组,每个元素不会产生额外的处理对象,如Node

基于什么

  • ReentrantLock
  • Condition

解析

构造函数

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • 默认创建一个不公平的锁,有利于吞吐量的提高
  • 基于锁创建两个条件,notEmpty otFull,操作这两个条件的前提必须是已经获取到锁,condition.await()会释放锁进行等待直到被唤醒或中断

非阻塞添加元素 offer(X)

添加元素offer(),offer其实是非阻塞的,添加失败直接放回false

  public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

/**
 * Inserts element at current put position
 *  advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

offer方法锁定代码块,因此enqueue的操作是安全的,在enque结束后,调用

notEmpty.signal()

唤醒notEmpty.await()状态中的线程,让它退出阻塞,尝试获取锁

阻塞添加元素put

 /**
 * Inserts the specified element at the tail of this
 *  queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

可以发现,如果count == items.length = true,会一直调用notFull.await()方法,释放锁,且当前线程阻塞等待,直到条件不成立,并且此处await是没有指定超时时间的,意味着它需要被其他线程唤醒

阻塞取走元素take

 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take方法先获取到锁,锁定代码块,可以看见

 while (count == 0)
      notEmpty.await();

它的作用是如果没有可take的元素就await,await时释放锁,当前线程陷入睡眠,当offer方法被调用,notEpty.signal()被调用后,take方法的线程从沉睡中醒来,尝试重新获取锁,此时,count == 0 = false,程序进行dequeue

 /**
 * Extracts element at current take position
 * advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

在获取到元素后,调用notFull.signal(),唤醒阻塞与notFull.await的线程,告诉它可以往队列里面放数据了

原文地址:https://www.cnblogs.com/windliu/p/8365677.html