java并发队列

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的非阻塞高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

1.ArrayDeque, (数组双端队列) 
2.PriorityQueue, (优先级队列) 
3.ConcurrentLinkedQueue, (基于链表的并发队列) 
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 
9.SynchronousQueue (并发同步阻塞队列)

 

四组API

 

方式抛出异常有返回值,不抛异常阻塞 等待超时等待
添加 add() offer() put() offer(,)
移除 remove() poll() take() poll(,)
检测队首元素 element() peek()    

 

一、ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。 
    它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。 
    头是最先加入的,尾是最近加入的,该队列不允许null元素。

ConcurrentLinkedQueue重要方法:

add 和offer() :都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别) 
poll() 和peek() :都是取头元素节点,区别在于前者会删除元素,后者不会。

代码示例:

package queue;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Test01 {
    //阻塞式队列最大好处,能够防止队列容器溢出,防止数据丢失
    public static void main(String[] args) {
        //非阻塞队列 无界
        ConcurrentLinkedQueue<String> concurrentLinkedQueue=new ConcurrentLinkedQueue<String>();
        concurrentLinkedQueue.offer("张三");
        concurrentLinkedQueue.offer("李四");
        concurrentLinkedQueue.offer("王五");
        concurrentLinkedQueue.offer("2020-12-15");
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.size());
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.size());
        System.out.println(concurrentLinkedQueue.peek());
        System.out.println(concurrentLinkedQueue.size());
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.poll());
        System.out.println(concurrentLinkedQueue.poll());


    }
}

 二、BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 
1、在队列为空时,获取元素的线程会等待队列变为非空。 
2、当队列满时,存储元素的线程会等待队列可用。 
阻塞队列是线程安全的。 
用途: 
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

 

1、ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。 
有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。 
ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

重要方法:

add(E e) 
将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出IllegalStateException; 
offer(E e) 
将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 
offer(E e, long timeout, TimeUnit unit) 
将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。

 代码示例:

package queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test02 {
    public static void main(String[] args) throws InterruptedException {
        //插入队列,如果队列已满,需要等待   获取队列,如果队列获取不到,需要等待
        BlockingQueue<String> arrayBlockingQueue=new ArrayBlockingQueue<String>(3);
        arrayBlockingQueue.offer("张三");
        arrayBlockingQueue.offer("李四",30, TimeUnit.SECONDS);//不等待
        arrayBlockingQueue.offer("王五",30, TimeUnit.SECONDS);//不等待
        arrayBlockingQueue.offer("溜溜",30, TimeUnit.SECONDS);//等待
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll(10,TimeUnit.SECONDS));//不会等待
        System.out.println(arrayBlockingQueue.poll(10,TimeUnit.SECONDS));//会等待


    }
}

2、LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的, 
如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。 
说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。 
和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

示例代码:

package queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Test03 {
    public static void main(String[] args) {
        BlockingQueue<String>  lbq  =new LinkedBlockingQueue<String>(3);
        lbq.add("张三");
        lbq.add("李四");
        lbq.add("王五");
        //lbq.add("溜溜");//Exception in thread "main" java.lang.IllegalStateException: Queue full
        lbq.offer("溜溜");
    }
}

3、PriorityBlockingQueue

riorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。 
所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。 
另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。

4、SynchronousQueue

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

三、使用BlockingQueue模拟生产者与消费者

代码示例

生产者:

public class ProducerThread implements Runnable {
    private BlockingQueue queue;
    private volatile boolean flag = true;
    private static AtomicInteger count = new AtomicInteger();

    public ProducerThread(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println("生产线程启动...");
            while (flag) {
                System.out.println("正在生产数据....");
                String data = count.incrementAndGet()+"";
                // 将数据存入队列中
                boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println("生产者,存入" + data + "到队列中,成功.");
                } else {
                    System.out.println("生产者,存入" + data + "到队列中,失败.");
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {

        } finally {
            System.out.println("生产者退出线程");
        }

    }
    public void stopThread() {
        this.flag = false;
    }
}

消费者:

class ConsumerThread implements Runnable {
    private BlockingQueue<String> queue;
    private volatile boolean flag = true;

    public ConsumerThread(BlockingQueue<String> queue) {
        this.queue = queue;

    }

    @Override
    public void run() {
        System.out.println("消费线程启动...");
        try {
            while (flag) {
                System.out.println("消费者,正在从队列中获取数据..");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (data != null) {
                    System.out.println("消费者,拿到队列中的数据data:" + data);
                    Thread.sleep(1000);
                } else {
                    System.out.println("消费者,超过2秒未获取到数据..");
                    flag = false;
                }


            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("消费者退出线程...");
        }

    }

}

运行:

public class ProducerAndConsumer {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
        ProducerThread producerThread1 = new ProducerThread(queue);
        ProducerThread producerThread2 = new ProducerThread(queue);
        ConsumerThread consumerThread1 = new ConsumerThread(queue);
        Thread t1 = new Thread(producerThread1);
        Thread t2 = new Thread(producerThread2);
        Thread c1 = new Thread(consumerThread1);
        t1.start();
        t2.start();
        c1.start();

        // 执行2s后,生产者不再生产
        Thread.sleep(2* 1000);
        producerThread1.stopThread();
        producerThread2.stopThread();

    }
}

运行结果:

BlockingQueue API
BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些
元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。
添加元素
方法说明
add() 如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put() 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入
offer() 如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit
unit)
尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入
检索元素
方法说明
take() 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit) 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果时,则返回 null
在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

原文地址:https://www.cnblogs.com/zouhong/p/14136150.html