RabbitMQ_2、工作队列

工作队列(竞争消费者模式)

官方案例

工作队列_消息发送

/**
 * @PackageName : com.rzk
 * @FileName : Send
 * @Description : 工作队列-轮询-消息生产者
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:21
 * @Version : 1.0.0
 */
public class Send {

    //定义队列名称
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("");
        factory.setVirtualHost("/");
        factory.setPassword("");
        factory.setPort(5672);

        try (
                //连接工厂创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 绑定队列
             * 声明队列
             *  第一个参数 queue :队列名称
             *  第二个参数 durable :是否持久化
             *  第三个参数 Exclusive :排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明的连接可见,并在连接断开时自动删除。
             * 这里需要注意三点:
             *      1 .排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他他队
             *      2 . ”百次“,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的;排他队列这个与昔通队歹怀同。
             *      3 .即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个容户端发送读取消息的应用场景。
             * 第四个参数 Auto-delete :自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 20; i++) {
                String message = " Send  "+i;
                //队列消息的生产者:发送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'" + i);
            }
        }
    }
}

工作队列_消息接收

新建两个消息消费去接收

1

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 工作队列-轮询-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv01 {
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            /**
             * 手动确认
             * multiple: 是否确认多条
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
        };
        /**
         * 监听队列消费消息
         * autoAck:自动应答
         * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
         */
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

2

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 工作队列-轮询-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv02 {
    private final static String QUEUE_NAME = "work_rr";

    public static void main(String[] argv) throws Exception {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模拟消费耗时
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            /**
             * 手动确认
             * multiple: 是否确认多条或单条数据
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
        };
        //监听队列消费消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

监听队列消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
在 Boolean autoAck = false的情况下,如果消费者1宕机了,消息队列没有收到消费者发送回的应答,就会将这个消息发送给下一个消费者处理。直到消费者处理完这个消息,并向消息队列发送了一个消息应答,告诉消息队列此时这个消息已经处理完成,消息队列才会将这个消息从内存中删除。

工作队列的优点:解决简单队列 当生产者生产消息大与消费者消费能力时,加多几个消费者,让消费者的消费能力大于等于生产者生产能力,这样就能减少多余的消息堆积在队列里面

公平模式

需要把这个代码放在接收消息类里面

        //限制消费者每次只能接受一条,处理完才能接受下一条消息
        channel.basicQos(1);
原文地址:https://www.cnblogs.com/rzkwz/p/14925181.html