java.util.concurrent.BlockingQueue指南

2. 概述

在本文中,我们将介绍一个最有用的java.util.concurrent.BlockingQueue来解决并发生产者 - 消费者问题。我们可以看一下BlockingQueue 接口的API以及该接口的方法如何使编写并发程序变得更容易。

在本文的后面,我们将展示一个包含多个生产者线程和多个消费者线程的简单程序的示例。

2. BlockingQueue类型

我们可以区分两种类型的BlockingQueue

  • 无限队列 - 几乎可以无限增长
  • 有界队列 - 定义了最大容量

2.1.无界队列

创建无界队列很简单:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
 

blockingQueue的容量将设置为Integer.MAX_VALUE。向无界队列添加元素的所有操作都将永远不会阻塞,因此它可能会增长到非常大的大小。

在使用无界限BlockingQueue设计生产者 - 消费者时,最重要的是消费者应该能够像生产者向队列添加消息一样快地消费消息。否则,内存可能会填满,我们会得到一个OutOfMemory异常。

2.2.有界队列

第二种类型的队列是有界队列。我们可以通过将容量作为参数传递给构造函数来创建这样的队列:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
 

这里我们有一个blockQueue,其容量等于10.这意味着当一个消费者试图将一个元素添加到已经满的队列时,取决于用于添加它的方法(offer()add() or put())),它将阻塞,直到插入对象的空间可用。否则,操作将失败。

使用有界队列是设计并发程序的好方法,因为当我们将元素插入到已经完整的队列时,这些操作需要等到消费者赶上并在队列中腾出一些空间。

3. BlockingQueue API

BlockingQueue接口中有两种类型的方法

  负责向队列添加元素的方法

  检索这些元素的方法。

在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

3.1.添加元素

  • add() -如果插入成功则返回true,否则抛出IllegalStateException
  • put() -将指定的元素插入队列,必要时等待空闲插槽
  • offer() -如果插入成功则返回true,否则返回false
  • offer(E e,long timeout,TimeUnit unit) -尝试将元素插入队列并等待指定超时内的可用插槽

3.2.检索元素

  • take() - 等待队列的head元素并将其删除。如果队列为空,则阻塞并等待元素变为可用
  • poll(long timeout,TimeUnit unit) -检索并删除队列的头部,如果需要元素可用,则等待指定的等待时间。超时后返回null

在构建生产者 - 消费者程序时,这些方法是BlockingQueue接口中最重要的构建块

4.多线程生产者 - 消费者示例

让我们创建一个由两部分组成的程序 - Producer和Consumer。

Producer将生成一个0到100的随机数,并将该数字放在BlockingQueue中我们将有4个生产者线程并使用put()方法阻塞调用,直到队列中有可用空间。

需要记住的重要一点是,我们需要阻止我们的消费者线程等待元素无限期地出现在队列中。

从生产者向消费者发出不再需要处理消息信号的好方法是,发送一个名为毒丸的特殊消息。我们需要发送与消费者一样多的毒丸。然后当消费者从队列中获取特殊的毒丸消息时,它将优雅地完成执行。

让我们看一下生产者类:

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
     
    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
     
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}
 

我们的生成器构造函数将BlockingQueue作为参数,用于协调生产者和使用者之间的处理。

我们看到方法generateNumbers()将100个元素放入队列中。它还需要有毒丸消息,以便在执行完成时知道放入队列的消息类型。需要将poisonPillPerProducer(消费者数量)个该消息放入队列中。

每个使用者将使用take()方法BlockingQueue中获取一个元素,它将阻塞,直到队列中有一个元素可以读取。

从队列中取出一个Integer后,它会检查该消息是否为毒丸,如果是,则结束一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。

这将使我们深入了解消费者的内部运作:

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;
     
    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
 

需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。

我们可以这样做,因为BlockingQueue可以在线程之间共享而无需任何显式同步。

既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为100个元素。

我们希望有4个生产者线程,并且消费者线程将等于可用处理器的数量:

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
 
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
 
for (int i = 1; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
 
for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
 
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
 

创建有界BlockingQueue我们正在创造4个生产者和N个消费者。我们将我们的毒丸消息指定为Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里需要注意的最重要的事情是BlockingQueue用于协调它们之间的工作。

当我们运行程序时,4个生产者线程将把随机整数放在BlockingQueue中,消费者将从队列中获取这些元素。每个线程将打印到标准输出线程的名称和结果。

5.结论

本文展示了BlockingQueue的实际用法,并解释了用于从中添加和检索元素的方法。此外,我们还展示了如何使用BlockingQueue构建多线程生产者 - 消费者计划来协调生产者和消费者之间的工作。

原文地址:https://www.cnblogs.com/gc65/p/10630993.html