Java之集合(十六)ArrayBlockingQueue

  转载请注明源出处:http://www.cnblogs.com/lighten/p/7427763.html

1.前言

  JDK5是一个重要的更新版本,其提供了大量的并发类。之前的介绍都是一些util下早期的集合类,本章开始介绍JDK5提供的并发包中所给出的在多线程下,线程安全的集合类。首先介绍的是Queue队列,之前就介绍了2个:ArrayDeque和PriorityQueue。队列算是天然需要线程安全的,因为其特性适用场景大多数都是并发操作,进行排队。所以JDK5并发包中提供了大量不同特性的线程安全的队列。本章从ArrayBlockingQueue开始介绍。

2.ArrayBlockingQueue

  ArrayBlockingQueue的数据结构和ArrayDeque十分相似,但是多了一些并发所需要的字段,比如锁。默认使用的所是非公平锁。

  初始化了容量和锁的相关设置。

  offer方法可以队列是不允许出现空的对象。使用的方法也是加锁来确保线程安全,如果队列满了就会返回false,入队列失败。

  入队列也是放入目前要放的位置,再找到下一个要放的位置,整个都是通过count计数来判断队列是否满了,也不用担心覆盖了数据,因为入队列的时候判断了当前数量是否等于容量。最后发出队列非空信号。

  poll方法会取出队列的数据,如果count是0,自然是null。否则进行出队列操作。

  取出当前下标的内容,并置为null,找到下一个要取出的位置。数量减一,迭代器存在也会处理取出迭代器。然后发出一个队列非满信号。

  之前所讲的非空和非满信号还是有所作用的。

  放入一个对象时,如果队列满了但是还是想放进去,就需要等待队列有空位。队列在移除一个对象时就会发出一个非满信号。上面的放入方法就是用到了这个信号。如果队列满了其会等待非满信号,直到超时。如果不希望超时,就是想放进去,可以使用put方法,不过其可能会抛出中断异常。

  poll方法一样,如果队列为空自然取不到,就一直等待一个非空的信号。不希望超时可以使用take方法。

  值得一提的是该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器。为了保证正确,增加了许多复杂的操作,但是由于循环数组和一些内部移除会导致迭代器丢失它们的位置,或显示一些它们不应该显示的元素。为了避免这个情况,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:

  1.跟踪循环的次数。即 takeIndex为0的次数。

  2.每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。

  以上JDK注解难以理解,看代码。queue中有一个字段是itrs,其存放了目前所创建的所有迭代器。

  每个迭代器都是一个节点,节点是虚引用。

  迭代器创建的时候会加入到这个itrs中。doSomeSweeping是用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,重新建立链表的连接,继续找下一个。这就完成了对无效迭代器的清理了。

  之前我们都看到,在出队列的时候,调用了itrs的remove方法,下面是该方法做的具体事情:

  队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除。否者如果takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取,此时的操作就是上面JDK注解1条件,实质上跟踪的是队列的循环次数,计数器cycles自增1,然后对所有的迭代器进行清理,清除耗尽了的迭代器。

  下面仔细研究一下ArrayBlockingQueue的迭代器实现,看看其是共享队列数据的含义是什么。还是从迭代器的构造函数看起:

  count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。

  hashNext()先判断nextItem有没有值了,没有值的时候触发noNext函数,这个函数做的就是调整修正队列。这里先提一下并发中迭代器会发生什么问题。ArrayBlockingQueue的实现是一个循环数组,使用takeIndex和putIndex来控制元素的出入队列。这样就产生了一个问题,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响,可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。所以才需要上面所说的2点来保证,第一个就是每个迭代器记录当前其循环的圈数,第二个就是队列元素出队列时影响所有的迭代器。这个修正队列就是在当前迭代器遍历完成之后,比较一下圈数来判断具体情况,圈数和队列读取下标和迭代器读取下标来判断是否要废弃该迭代器。

  next方法也会在迭代器没有被废弃的时候比较一下当前队列的情况。之后就是更新上次的下标和游标已经下一个值。

  其它的方法就不再描述,现在给一些测试用例,来印证上面所描述的内容,更为直观的表现其迭代器的使用。由于有锁实际上也是顺序操作,所以例子在单线程中模拟。

    @Test
	public void test() {
		ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
		queue.offer(1);
		queue.offer(2);	// 放入2个元素
		Iterator<Integer> it1 = queue.iterator();
		Iterator<Integer> it2 = queue.iterator();
		Iterator<Integer> it3 = queue.iterator();
		Iterator<Integer> it4 = queue.iterator();
		Iterator<Integer> it5 = queue.iterator();
		while(it1.hasNext()) {
			System.out.print(it1.next()+",");
		}
		System.out.println();
		queue.poll();				// 清除第一个元素,再放入数据
		queue.offer(3);				// 填充一个元素,测试可以追上的情况,队列中有2,3
		while(it2.hasNext()) {
			System.out.print(it2.next()+",");
		}
		System.out.println();
		queue.poll();				// 再移除一个元素,此时1,2元素都移除了
		// 迭代器无法获取元素2,但是队列只相差一圈
		while(it3.hasNext()){
			System.out.print(it3.next()+",");
		}
		System.out.println();
		queue.offer(4);		// 队列中有3,4
		queue.poll();
		while(it4.hasNext()) {
			System.out.print(it4.next()+",");
		}
		System.out.println();
		queue.poll();
		queue.offer(5);
		while(it5.hasNext()) {
			System.out.print(it5.next()+",");
		}
	}

  这5个迭代器都是在队列满的时候生成的,之后第一个迭代器正常遍历,第2个迭代器是在队列出了一个元素,又入了一个元素的时候,此时还是出现了所有的元素,队列依旧是满的。第3个迭代器是又出了一个元素,队列中只剩下3,迭代器依旧没影响,还是输出了1,3。第4个迭代器放入一个元素,又移除一个元素,此时应该还剩4,输出的也是1,4。问题在第5个迭代器,移除了一个元素,再添加一个元素,此时应该是从新回到位置2次了,所以5只输出了一个1。这个就判断迭代器失效了。再放入一个元素测试是一样的。

  可能会有疑问,迭代器5先是12,然后34,最后填到5的时候才废弃,不是超了两圈?实际上,如果队列中有1,2 全部取出队列,再填3,这样一圈迭代器就只能输出1了就被废弃了。

  上面只是个人的研究,实际上队列使用迭代器的需求也不算大。使用到的时候就需要好好研究一下了。

原文地址:https://www.cnblogs.com/lighten/p/7427763.html