RabbitMQ 之 Work queues

一、概述

Work queues 该模式下有一个生产者(Producer)、两个消费者(Consumer01、Consumer02),多个消费者一起处理生产者发送过来的消息,消息分发给消费者一般有两种方式,分别是轮询分发(公平分发)、预期值分发(不公平分发)

轮询分发是默认的分发方式,对应的是 prefetchCount = 0,它采用的是公平分发的方式,消息分发与消费者的消费速度没有任何关系,如果 prefetchCount != 0,那么它就采用不公平的分发方式,即先按照预期值分发,分发完成之后再按照消费者的消费速度进行消息分发

二、轮询分发(默认值)

默认情况下,工作队列(Work queues)采用的是轮询分发的策略,与具体的消费者处理速度快慢没有任何关系

1、工具类

public class RabbitmqUtils {
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2、Producer

public class Producer {
    private static final String QUEUE_NAME = "WorkQueuesDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        String message = "有意思的消息--->";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
        }
        System.out.println("Producer send message successfully");
    }
}

3、Consumer01

public class Consumer01 {
    private static final String QUEUE_NAME = "WorkQueuesDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());

            // 休眠 10 s
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 参数一、deliveryTag:消息应答标记
            // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
            // 处理完逻辑之后应答 ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            System.out.println(message);
        };
        // prefetchCount 的默认值为 0
        // 不设置 basicQos 与 channel.basicQos(0) 的效果是等价的,即采用轮询分发的策略
        channel.basicQos(0);
        // 设置手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag);
        });
    }
}

4、Consumer02

public class Consumer02 {
    private static final String QUEUE_NAME = "WorkQueuesDemo";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            // 参数一、deliveryTag:消息应答标记
            // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
            // 处理完逻辑之后应答 ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            System.out.println(message);
        };

        // 设置 prefetchCount
        channel.basicQos(0);

        // 设置手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag);
        });
    }
}

5、测试过程及结果

首先启动 Cousumer01、Consumer02,然后设置 prefetchCount = 0,接着启动 Producer 发送消息,切换到 RabbitMQ 控制台

6、Consumer01、Consumer02 的消费情况

三、不公平分发(预期值分发1)

代码和上面的基本相同,只是将 Consumer01、Consumer02 的 prefetchCount 设置为 1 即可,消息的分发会取决于消费者的处理速度,性能好的消费者将分发到更多的消息

1、原理图

2、RabbitMQ 控制台

3、Consumer01、Consumer02 的消费情况

Consumer01、Consumer02 先按照 prefetchCount = 1(预期值)进行分发,然后剩下的消息按照消费者的性能进行分配

四、不公平分发(预期值分发2)

代码和上面的基本相同,只是将 Consumer01 的 prefetchCount 设置为 2、Consumer02 的 prefetchCount 设置为 4 即可

1、原理图

2、RabbitMQ 控制台

3、Consumer01、Consumer02 的消费情况 

Consumer01 设置了 prefetchCount = 2,Consumer02 设置了 prefetchCount = 4,所以 10 个消息分发两个给 Consumer01,分发 4 个给 Consumer02,剩下的 4 个消息的分发取决于消费者的处理速度(目前测试)

 

原文地址:https://www.cnblogs.com/xiaomaomao/p/15530171.html