Java 集合框架(四):PriorityQueue 和 ConcurrentLinkedQueue

Queue

队列是一种支持 FIFO 的数据结构或者容器。Queue 接口下面的实现类包括 Deque,非阻塞队列和阻塞队列。

PriorityQueue

PriorityQueue 是一个基于优先级的无界队列。比如我们的作业系统中,当一个作业完成后,在所有等待调度的作业中选择一个优先级最高的作业来执行,并且可以添加新的作业到优先队列中。

特点:

  • 元素按照自然顺序进行排列或者根据传入的 Comparator 进行排序。
  • 不允许插入 null 或者不可比较的对象(没有实现 Comparable 接口的对象)。
  • 优先级队列的头部元素最小(底层是一个最小堆)。
  • 底层实现是一个数组,会根据元素的数量进行扩容。
  • 线程不安全的

方法分析:

先来看这个优先级队列的构成,我们发现了一个非常重要的问题,就是优先级队列的底层是一个数组。

public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    transient Object[] queue;    //队列容器, 默认是11
    private int size = 0;  //队列长度
    private final Comparator<? super E> comparator;  //队列比较器, 为null使用自然排序
    //....
}

PriorityQueue 通过最小堆来实现,可以用一颗完全二叉树来表示。

通过数组就完全可以表示上面的二叉树。所以 PriorityQueue 的 peek 和 element 方法来获取第一个元素的时间复杂度都是常数级别,而增加和删除的复杂度为 log(N)。

add 和 offer

add 和 offer 都是像数组中插入元素,新加入的元素会放入到数组的最后一个位置,然后根据优先级对最小堆进行调整。

//offer(E e)
public boolean offer(E e) {
    if (e == null)//不允许放入null元素
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);//自动扩容
    size = i + 1;
    if (i == 0)//队列原来为空,这是插入的第一个元素
        queue[0] = e;
    else
        siftUp(i, e);//调整
    return true;
}

element 和 peek

这两个方法的语义完全相同,都是获取队首元素且不删除,所以直接返回数组下标为 0 的元素即可。

//peek()
public E peek() {
    if (size == 0)
        return null;
    return (E) queue[0];//0下标处的那个元素就是最小的那个
}

remove 和 poll

remove 和 poll 方法的作用就是删除队首元素。删除了这个元素后,为了维护最小堆的特性,会进行调整。

使用最后一个元素替换队首的元素,然后使用 siftDown 方法进行调整。siftDown 的作用是从最后当前第一个元素开始,与左右孩子中较小的一个进行比较,知道小于或等于左右孩子中的任意一个为止。

//siftDown()
private void siftDown(int k, E x) {
    int half = size >>> 1;
    while (k < half) {
        //首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标
        int child = (k << 1) + 1;//leftNo = parentNo*2+1
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
            comparator.compare((E) c, (E) queue[right]) > 0)
            c = queue[child = right];
        if (comparator.compare(x, (E) c) <= 0)
            break;
        queue[k] = c;//然后用c取代原来的值
        k = child;
    }
    queue[k] = x;
}

remove

remove(Object o) 用于删除队列中跟 o 相等的某一个元素(如果有多个,只删除一个)。删除会改变队列结构,所以需要进行调整。如果删除的是最后一个元素,直接删除即可。如果不是最后一个元素,把最后一个元素放入到它的位置再进行 siftDown 即可。

//remove(Object o)
public boolean remove(Object o) {
    //通过遍历数组的方式找到第一个满足o.equals(queue[i])元素的下标
    int i = indexOf(o);
    if (i == -1)
        return false;
    int s = --size;
    if (s == i) //情况1
        queue[i] = null;
    else {
        E moved = (E) queue[s];
        queue[s] = null;
        siftDown(i, moved);//情况2
        ......
    }
    return true;
}

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是 Doug Lea 为我们准备的一个并发容器,是一个线程安全的队列。

特点:

  • 线程安全的并发容器
  • 底层基于 CAS 实现。
  • 底层的数据接口是链表。

方法分析:

Node 类的源码,里面包含的属性一个是值,另一个是指向下一个节点的引用。

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        .......
}

ConcurrentLinkedQueue 包含了一个头指针,一个尾指针。

在队列进行入队,出队的时候免不了对节点进行操作,在处理器执行集能够支持 CMPXCHG 指令后,在 java 源码涉及到并发处理都会使用 CAS 操作。下面列举了几个针对 Node 的 CAS 操作。

//更改Node中的数据域item   
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

UNSAFE 是虚拟机底层提供的方法,我们知道 CAS 是由它实现的即可。

offer

先假设我们要插入两个元素:

1. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
2. queue.offer(1);
3. queue.offer(2);

先来看 offer 方法的源码。

public boolean offer(E e) {
    //e为null则抛出空指针异常
1    checkNotNull(e);
   
   //构造Node节点构造函数内部调用unsafe.putObject,后面统一讲
2    final Node<E> newNode = new Node<E>(e);

     
    //从尾节点插入
3    for (Node<E> t = tail, p = t;;) {

4        Node<E> q = p.next;

        //如果q=null说明p是尾节点则插入
5        if (q == null) {
            
6            //cas插入(1)
7            if (p.casNext(null, newNode)) {
                //cas成功说明新增节点已经被放入链表,然后设置当前尾节点(包含head,1,3,5.。。个节点为尾节点)
8                if (p != t) // hop two nodes at a time
9                    casTail(t, newNode);  // Failure is OK.
10                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
11        else if (p == q)//(2)
            //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
            //重新找新的head,因为新的head后面的节点才是激活的节点
12            p = (t != (t = tail)) ? t : head;
        else
            // 寻找尾节点(3)
13            p = (p != t && t != (t = tail)) ? t : q;
    }
}
  1. 第一行代码对传入的元素进行 null 判断。

  2. 讲 e 包装成一个 node 节点。

  3. 通过 for 循环进行 CAS 操作,这个 for 循环只有初始化条件,没有结束条件,这很符合 CAS 的套路(在循环体中 CAS 成功会直接 return 返回,失败就在 for 循环中不断重试直至成功)。这里 t 被初始化为 tail,p 被初始化为 t,即 tail。

  4. 如果 p 的下一个节点为 null,则 p 就是当前的尾节点,使用 casNext 将我们新建的 node 设置成尾节点 p 的 next 节点。如果 casNext 的操作失败则在循环中重试。

  5. 此时 p == t,直接返回,队列中插入了第一个元素。

  6. 此时的队尾尾 node1,而 tail 节点依然指向了 node0。

下面我们继续插入第二个元素。

  1. 插入第二个元素走到第四行时,q 就不是 null 了,而是 node1。
  2. 第五行为 false。
  3. 第十一行为 false。
  4. 此时代码走到了第十三行。这里说明我们在插入元素的时候,tail 可能并不是真正的队尾节点,所以第十三行的作用是找到真正的队尾节点,然后将 p 的引用指向它。
  5. 第十三行代码在单线程的环境中执行时,p == t,所以 p 被赋值为 q,也就是 node1。
  6. 再次循环。通过 casNext 将 p 的 next 设置为新增的 node。
  7. 走到第八行,这时 p!=t,会通过 casTail 将当前节点 node 设置为队尾节点。
  8. 我们发现第九行的注释里面写 CAS failed 也可以,原因是我们通过 p 的下一个节点是否为 null 来判断后面的逻辑,如果 第九行失败,下面插入的元素多进行几步第十三行的操作就可以了。

我们回头来看寻找尾节点的逻辑,p = (p != t && t != (t = tail)) ? t : q,这段代码永远不会将 p 赋值为 t,因为在单线程中 p 一直等于 t。我们来看看多线程环境下的执行情况。

offer->pull->offer

  1. 线程 A 读取了变量 t,t 指向队尾。
  2. 线程 B 刚好在这个时候 offer 了一个 node 之后,tail 发生了变化。此时 p != t。
  3. 此时 t != tail, 最后将 tail 赋值为 t。这时 t 就指向了最新的队尾节点。然后就可以执行 offer 操作了。

第十一行代码等我们学习完 poll 的源码之后再来看。

poll

public E poll() {
    restartFromHead:
    1. for (;;) {
    2.    for (Node<E> h = head, p = h, q;;) {
    3.        E item = p.item;

    4.        if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
    5.            if (p != h) // hop two nodes at a time
    6.                updateHead(h, ((q = p.next) != null) ? q : p);
    7.            return item;
            }
    8.        else if ((q = p.next) == null) {
    9.            updateHead(h, p);
    10.            return null;
            }
    11.        else if (p == q)
    12.            continue restartFromHead;
            else
    13.            p = q;
        }
    }
}

首先假设加入完两个元素后的队列状态如下,tail 没有更新。

  1. 我们还是把 p 作为要删除的真正的头节点,h 指向的并不一定是头节点。
  2. 由于 p=h=head,此时第四行代码的 item!=null 为 true,接下来通过 casItem 将 node1 的数据值设置为 null。
  3. 如果 CAS 失败则继续循环。
  4. 进入第五行时,p 和 h 都指向 node1,因此为 false。直接返回刚才的值。

运行之后的结果。

继续 poll。

  1. 此时 h 和 p 指向的节点的数据值为 null,要重新定位头节点(找到数据值不为 null 的节点)。
  2. 走到第八行代码,q 指向了 node2。然后走到第十三行代码,这时 p 和 q 同时指向了 node2。

进行下一次循环。

  1. 第四行将 p 的 item 设置为 null。
  2. 因为 p 指向了 node2,而 h 还是 node1,因此第五行为 ture。
  3. 将 head 指向 node3,同时通过 h.lazySetNext 将 node1 的 next 指向他自己。

再来看多线程情况下需要注意的点:

else if (p == q)
    continue restartFromHead;

上一个判断 q = p.next 就是说 q 是 p 的下一个节点,那么什么时候 q 会等于 p 那?只有 p 指向的节点被 lazySetHead 了之后。即 A 在判断 p==q 时,线程 B 已经 poll 完,并且将 p 指向节点变为了 lazySetHead 的节点。所以使用 continue restartFromHead 来保证拿到最新的 head。

offer 方法最后的补充

对于 offer 方法的第十一行代码,我们来做一个补充。假设队列的初识状态如下。

在 offer 方法的执行过程中,当 p 指向第一个节点时,此时第一个节点恰巧被 poll 了,这个节点变为一个哨兵节点。

这里会重新寻找 head 节点。

更新机制

对于 offer 和 poll 方法,我们发现 tail 和 head 是延迟更新的。源码中的注视为 hop two node at a time。如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是会有新能损耗。如果能够减少 CAS 的更新操作,无疑可以提升效率。

原文地址:https://www.cnblogs.com/paulwang92115/p/12184784.html