一、概述
Work queues 该模式下有一个生产者(Producer)、两个消费者(Consumer01、Consumer02),多个消费者一起处理生产者发送过来的消息,消息分发给消费者一般有两种方式,分别是轮询分发(公平分发)、预期值分发(不公平分发)
轮询分发是默认的分发方式,对应的是 prefetchCount = 0,它采用的是公平分发的方式,消息分发与消费者的消费速度没有任何关系,如果 prefetchCount != 0,那么它就采用不公平的分发方式,即先按照预期值分发,分发完成之后再按照消费者的消费速度进行消息分发
二、轮询分发(默认值)
默认情况下,工作队列(Work queues)采用的是轮询分发的策略,与具体的消费者处理速度快慢没有任何关系
1、工具类
public class RabbitmqUtils {
private static final String HOST_ADDRESS = "192.168.59.130";
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin123";
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ADDRESS);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2、Producer
public class Producer {
private static final String QUEUE_NAME = "WorkQueuesDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "有意思的消息--->";
for (int i = 1; i < 11; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully");
}
}
3、Consumer01
public class Consumer01 {
private static final String QUEUE_NAME = "WorkQueuesDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
// 休眠 10 s
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 参数一、deliveryTag:消息应答标记
// 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
// 处理完逻辑之后应答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
// prefetchCount 的默认值为 0
// 不设置 basicQos 与 channel.basicQos(0) 的效果是等价的,即采用轮询分发的策略
channel.basicQos(0);
// 设置手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag);
});
}
}
4、Consumer02
public class Consumer02 {
private static final String QUEUE_NAME = "WorkQueuesDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
// 参数一、deliveryTag:消息应答标记
// 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
// 处理完逻辑之后应答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
// 设置 prefetchCount
channel.basicQos(0);
// 设置手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag);
});
}
}
5、测试过程及结果
首先启动 Cousumer01、Consumer02,然后设置 prefetchCount = 0,接着启动 Producer 发送消息,切换到 RabbitMQ 控制台
6、Consumer01、Consumer02 的消费情况
三、不公平分发(预期值分发1)
代码和上面的基本相同,只是将 Consumer01、Consumer02 的 prefetchCount 设置为 1 即可,消息的分发会取决于消费者的处理速度,性能好的消费者将分发到更多的消息
1、原理图
2、RabbitMQ 控制台
3、Consumer01、Consumer02 的消费情况
Consumer01、Consumer02 先按照 prefetchCount = 1(预期值)进行分发,然后剩下的消息按照消费者的性能进行分配
四、不公平分发(预期值分发2)
代码和上面的基本相同,只是将 Consumer01 的 prefetchCount 设置为 2、Consumer02 的 prefetchCount 设置为 4 即可
1、原理图
2、RabbitMQ 控制台
3、Consumer01、Consumer02 的消费情况
Consumer01 设置了 prefetchCount = 2,Consumer02 设置了 prefetchCount = 4,所以 10 个消息分发两个给 Consumer01,分发 4 个给 Consumer02,剩下的 4 个消息的分发取决于消费者的处理速度(目前测试)