并发编程(六)并发队列

一、并发队列的有界和无界

  并发队列:实际上就是在并发场景下使用的队列。

  有/无界概念:有界,就是规定了队列的大小,比如初始值给定位16。无界不是真的无界,是整形的最大值,这个值是达不到的(因为内存不够),所以通常称为无界

有界队列

常见的有界队列

  • ArrayBlockingQueue :基于数组实现的阻塞队列
  • LinkedBlockingQueue :基于链表实现的阻塞队列,该有界队列不设置大小时就是Integer.MAX_VALUE
  • SynchronousQueue :内部容量为零,适用于元素数量少的场景,尤其特别适合做交换数据用,内部使用队列来实现公平性的调度,使用来实现非公平的调度,在Java6时使用CAS代替了原来的锁逻辑

有界队列的共性

  • put 、take 操作都是阻塞的
  • offer、poll 操作非阻塞的
    • offer操作时,若队列满了会返回false,不会阻塞
    • poll操作时,若队列为空会返回null,不会阻塞
  • 并不是在所有场景下,非阻塞都是好的,阻塞代表着不占用CPU,在有些场景也是需要阻塞的,put、take操作存在必有其存在的必然性

ArrayBlockingQueue 与 LinkedBlockingQueue 对比

  • ArrayBlockingQueue 实现简单,表现稳定,添加和删除操作使用同一个锁,通常性能不如后者
  • LinkedBlockingQueue 添加和删除两把锁是分开的,所以竞争会小一些

无界队列

常见的无界队列

  • ConcurrentLinkedQueue :无锁队列,底层使用CAS操作,通常具有较高吞吐量,但是具有读性能的不确定性,弱一致性——不存在如ArrayList等集合类的并发修改异常,通俗的说就是遍历时修改不会抛异常;
  • PriorityBlockingQueue :具有优先级的阻塞队列;
  • DelayedQueue :延时队列,使用场景:
    • 缓存 :清掉缓存中超时的缓存数据
    • 任务超时处理
    • 内部实现其实是采用带时间的优先队列,可重入锁,优化阻塞通知的线程元素leader
  • LinkedTransferQueue:简单的说也是进行线程间数据交换的利器

无界队列的共性

  • put 操作永远都不会阻塞,空间限制来源于系统资源的限制
  • 底层都使用CAS无锁编程 

二、并发队列的阻塞式与非阻塞式的区别

  阻塞式队列

  • 入列(存) :阻塞式队列,如果存放的队列超出队列的总数,是时候会进行等待(阻塞)。当队列达到总数的时候,入列(生产者)会进行阻塞。这时候只有当消费者消费了队列中的队列之后,生产者才可以继续往队列中存放队列。
  • 出列(取) :如果获取队列为空的情况下,这时候也会进行等待(阻塞)。这时候队列中没有队列,消费者无法消费队列,只有生产者往对队列中存放队列之后,消费者才可以进行消费。

  非阻塞队列

  • 入列(存)、出列(取) :均直接返回结果,不会阻塞去等待。

三、非阻塞队列详解

ConcurrentLinkedQueue

  定义:一个基于链接节点的无界线程安全队列。

  特点:

  • 此队列按照 FIFO(先进先出)原则对元素进行排序。
  • 队列的头部 是队列中时间最长的元素,队列的尾部 是队列中时间最短的元素。
  • 当我们获取一个元素时,它会返回队列头部的元素。

  常用方法:

  • add(E e) :将指定元素插入此队列的尾部,返回值为Boolean,源码内部是直接调用的offer()方法。
  • contains(Object o) :如果此队列包含指定元素,则返回 true。
  • isEmpty() :如果此队列不包含任何元素,则返回 true。
  • iterator() :返回在此队列元素上以恰当顺序进行迭代的迭代器,返回值为 Iterator。
  • offer(E e) :将指定元素插入此队列的尾部,返回值为 boolean。
  • peek() 获取但不移除此队列的头;如果此队列为空,则返回 null。
  • poll()获取并移除此队列的头,如果此队列为空,则返回 null。
  • remove(Object o) :从队列中移除指定元素的单个实例(如果存在),返回值为 boolean。
  • size() :返回此队列中的元素数量。
  • toArray()  :返回以恰当顺序包含此队列所有元素的数组。

  举个例子:

public class ConcurrentLinkedQueueTest {
    public static void main(String[] args) {
        /**
         * 无参构造方法
         * ConcurrentLinkedQueue(): 创建一个最初为空的 ConcurrentLinkedQueue
         */
        ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue();
        /**
         * 1、add(E e) :将指定元素插入此队列的尾部,返回值为Boolean。
         */
        System.out.println("=====1、从队列尾部添加5=====");
        Boolean addBoolean = concurrentLinkedQueue.add(5);
        System.out.println("是否添加到队列尾部成功: " + addBoolean);
        System.out.println();

        /**
         * 2、contains(Object o) :如果此队列包含指定元素,则返回 true。
         */
        System.out.println("=====2、查看队列中是否包含5=====");
        Boolean containsBoolean = concurrentLinkedQueue.contains(5);
        System.out.println("是否包含5:" + containsBoolean);
        System.out.println();

        /**
         * 3、isEmpty() :如果此队列不包含任何元素,则返回 true
         */
        System.out.println("=====3、判断队列是否为空=====");
        Boolean isEmptyBoolean = concurrentLinkedQueue.isEmpty();
        System.out.println("concurrentLinkedQueue是否为空:" + isEmptyBoolean);
        System.out.println();

        /**
         * 4、iterator() :返回在此队列元素上以恰当顺序进行迭代的迭代器,返回值为 Iterator<E>    。
         */
        concurrentLinkedQueue.add(6);
        concurrentLinkedQueue.add(7);
        concurrentLinkedQueue.add(8);
        System.out.println("=====4、迭代队列元素=====");
        Iterator<Integer> iterator = concurrentLinkedQueue.iterator();
        System.out.println("iterator的结果:");
        while (iterator.hasNext()) {
            System.out.print(iterator.next() + " ");
        }
        System.out.println();
        System.out.println();

        /**
         * 5、offer(E e) :将指定元素插入此队列的尾部,返回值为boolean。
         */
        System.out.println("=====5、offer()方法插入元素=====");
        Boolean offerBoolean = concurrentLinkedQueue.offer(9);
        System.out.println("是否插入队列尾部成功:" + offerBoolean);
        System.out.println();

        /**
         * 6、peek() :获取但不移除此队列的头;如果此队列为空,则返回 null。
         */
        System.out.println("=====6、peek()方法获取元素【元素不出队列】=====");
        Integer peekResult = concurrentLinkedQueue.peek();
        System.out.println("队列的第一个信息:" + peekResult);
        System.out.println();

        /**
         * 7、poll() :获取并移除此队列的头,如果此队列为空,则返回 null。
         */
        System.out.println("=====7、poll()方法获取元素【元素出列】=====");
        Integer pollResult = concurrentLinkedQueue.poll();
        System.out.println("队列的第一个信息【此时元素已经移出队列】:" + pollResult);
        System.out.println();

        /**
         * 8、remove(Object o) :从队列中移除指定元素的单个实例(如果存在),返回值为Boolean。
         */
        System.out.println("=====8、remove()方法移除队列中的元素=====");
        Boolean removeBoolean = concurrentLinkedQueue.remove(9);
        System.out.println("是否移除9成功?" + removeBoolean);
        System.out.println();

        /**
         * 9、size():返回此队列中的元素数量
         */
        System.out.println("=====9、输出队列中的元素数量=====");
        Integer size = concurrentLinkedQueue.size();
        System.out.println("队列的元素数量:" + size);
    }
}

四、阻塞队列详解

  阻塞队列分类:

  • DelayQueue :基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
  • SynchronousQueue :实际上不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列【用作配对】。
  • ArrayBlockingQueue :基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列元素最优先能够访问。
  • LinkedBlockingQueue :基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
  • PriorityBlockingQueue :PriorityBlockingQueue会按照元素的优先级对元素进行排序,按照优先级顺序出队【优先级队列】,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志)。

常用方法:

  • add(E e) :将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常。
  • remove() :移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常。
  • offer(E e) :将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false
  • offer(E o, long timeout, TimeUnit unit) :可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
  • put(Object obj) :把obj加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
  • poll() :移除并获取队首元素,若成功,则返回队首元素;否则返回null
  • poll(long time) :取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
  • poll(long timeout, TimeUnit unit) :从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则超时还没有数据可取,返回失败。
  • take() :取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
  • drainTo() :一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
  • peek() :获取队首元素,若成功,则返回队首元素;否则返回null。

  举个例子:

/**
 * 阻塞队列测试【以数组阻塞队列为例(公平)】
 */
public class BlockQueueTest {
    private int queueSize = 10;//数组长的为10
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize, false);//数组阻塞队列【公平队列】

    public static void main(String[] args) {
        BlockQueueTest test = new BlockQueueTest();
        Producer producer = test.new Producer();//生产者
        Consumer consumer = test.new Consumer();//消费者

        producer.start();
        consumer.start();
    }

    /**
     * 消费者线程
     */
    public class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        //消费方法
        private void consume() {
            int i = 5;
            //一直循环去队列里取元素
            while (i > 0) {
                try {
                    Integer take = queue.take();
                    System.out.println("从队列取走元素:" + take + " 队列剩余" + queue.size() + "个元素");
                    i--;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 生产者线程
     */
    public class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            int i = 5;
            //一直循环去队列里插入元素
            while (i > 0) {
                try {
                    boolean offer = queue.offer(i);
                    System.out.println("向队列插入一个元素成功? " + offer + " 队列剩余空间:" + (queueSize - queue.size()));
                    i--;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

参考文章:

原文地址:https://www.cnblogs.com/riches/p/13817207.html