阻塞队列

概念

阻塞队列(BlockingQueue):支持2个附加操作的队列。阻塞队列常用于生产者和消费者的场景,生产者是往队列中添加元素的线程,消费者是从队列中获取元素的线程。

附加操作

1)队列为空时,获取元素的线程会等待队列变为非空

2)队列为满时,存储元素的线程会等待队列可用。

 

种类

ArrayBlockingQueue:由数组结构组成的 有界 阻塞队列

LinkedBlockingQueue:由链表结构组成的 有界(但默认大小为Integer.MAX_VALUE)阻塞队列

PriorityBlockingQueue:支持优先级排序的 无界 阻塞队列

DelayQueue:使用优先级队列实现的 延迟 无界 阻塞队列

SynchronousQueue不存储元素 的阻塞队列,也即单个元素的队列

LinkedTransferQueue:由链表结构组成的 无界 阻塞队列

LinkedBlockingDeque:由链表结构组成的 双向 阻塞队列

 

方法

方法抛出异常返回特殊值一直阻塞超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

插入

add

1)合法的add:add元素数量与规定元素数量一致

import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 构造方法传参是规定其长度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         System.out.println(blockingQueue.add("1"));
         System.out.println(blockingQueue.add("2"));
         System.out.println(blockingQueue.add("3"));
         // true true true
     }
 ​
 }

2)异常( IllegalStateException )

当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 构造方法传参是规定其长度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         System.out.println(blockingQueue.add("1"));
         System.out.println(blockingQueue.add("2"));
         System.out.println(blockingQueue.add("3"));
 ​
         System.out.println(blockingQueue.add("4"));
         // java.lang.IllegalStateException: Queue full
     }
 ​
 }

offer

1)如果队列已满,再插入元素,则返回false,而不会抛出异常

public static void main(String[] args) {
     // 构造方法传参是规定其长度
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     System.out.println(blockingQueue.offer("1"));
     System.out.println(blockingQueue.offer("2"));
     System.out.println(blockingQueue.offer("3"));
     // true true true
 ​
     System.out.println(blockingQueue.offer("4"));
     // false
 }

2)设置超时时间

private static void offer() throws InterruptedException {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     // 设置超时时间为2s,TimeUnit是时间颗粒度转换,TimeUnit.SECOND代表秒
     System.out.println(blockingQueue.offer("1", 2L, TimeUnit.SECONDS));
     System.out.println(blockingQueue.offer("2", 2L, TimeUnit.SECONDS));
     System.out.println(blockingQueue.offer("3", 2L, TimeUnit.SECONDS));
     // true true true
     System.out.println(blockingQueue.offer("4", 2L, TimeUnit.SECONDS));
     // false
 }

put

如果队列已满,再插入元素,就会阻塞,直至可以添加进去为止

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     try {
         blockingQueue.put("1");
         blockingQueue.put("2");
         blockingQueue.put("3");
         blockingQueue.put("4");
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }

 

移除

remove

合法的remove

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 构造方法传参是规定其长度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         // 1 2 3
         
         System.out.println(blockingQueue.remove());
     }
 ​
 }

异常(NoSuchElementException)

当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常

 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 构造方法传参是规定其长度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         
         System.out.println(blockingQueue.remove());
         // java.util.NoSuchElementException
     }
 ​
 }

poll

当队列为空时,从队列里获取元素时直接返回null,不抛出异常

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.offer("1");
     blockingQueue.offer("2");
     blockingQueue.offer("3");
 ​
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     // 1 2 3
     
     System.out.println(blockingQueue.poll());
     // null
 }

take

当队列为空时,从队列里获取元素时阻塞,直至其中存在元素

public static void main(String[] args) throws Exception {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.put("1");
     blockingQueue.put("2");
     blockingQueue.put("3");
 ​
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
 }

检查

element

如果队列为空,则抛异常NoSuchElementException

import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 ​
 public class BlockingQueueDemo {
 ​
     public static void main(String[] args) {
         // 构造方法传参是规定其长度
         BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
         blockingQueue.add("1");
         blockingQueue.add("2");
         blockingQueue.add("3");
 ​
         System.out.println(blockingQueue.element());
         // 1
     }
 ​
 }

peek

如果队列为空,则返回null

 public static void main(String[] args) {
     BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
     blockingQueue.offer("1");
     blockingQueue.offer("2");
     blockingQueue.offer("3");
 ​
     System.out.println(blockingQueue.peek());
 }

 

SynchronousQueue

SynchronousQueue(同步队列)模拟生产消费

import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 ​
 public class SynchronousQueueDemo {
 ​
     public static void main(String[] args) {
         // 同步队列不存储元素
         BlockingQueue queue = new SynchronousQueue();
 ​
         // 生产者线程
         new Thread(() -> {
             try {
                 System.out.println(Thread.currentThread().getName() + "	 put 1");
                 queue.put("1");
 ​
                 System.out.println(Thread.currentThread().getName() + "	 put 2");
                 queue.put("2");
 ​
                 System.out.println(Thread.currentThread().getName() + "	 put 3");
                 queue.put("3");
 ​
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }, "provider").start();
 ​
         // 消费者线程
         new Thread(() -> {
             try {
                 // 每次消费前先睡眠2秒钟
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "	" + queue.take());
 ​
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "	" + queue.take());
 ​
                 TimeUnit.SECONDS.sleep(2);
                 System.out.println(Thread.currentThread().getName() + "	" + queue.take());
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }, "consumer").start();
         
         // 程序执行结果:
         // provider     put 1
         // consumer    1
         // provider     put 2
         // consumer    2
         // provider     put 3
         // consumer    3
     }
 }

 

原文地址:https://www.cnblogs.com/kzyuan/p/14453088.html