RabbitMQ 死信队列

一、死信队列

死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景

二、死信的来源

通常死信的来源有下面几种方式

1、消息 TTL (Time To Live) 过期

2、队列达到了最大长度,无法再添加消息到 MQ 中了

3、消息被拒,并且没有重新入队(basic.reject || basic.Nack) && (requeue = false)

三、消息 TTL 过期

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明正常消息的交换机(类型为 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交换机绑定正常消息队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消费者的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费者时的回调接口");
        };
        // 消费者消费消息
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);

        System.out.println("Consumer01 开始消费消息");
    }
}

2、Consumer02

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道对象
        Channel channel = RabbitmqUtils.getChannel();

        // 声明死信交换机(topic 类型)
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 声明死信队列
        channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
        // 死信交换机绑定死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        // 消息成功之后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消费者的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费者时的回调接口");
        };
        // 消费者消费消息
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
        System.out.println("Consumer02 开始消费消息");
    }
}

3、Producer

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明一个 direct 类型的交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 消息发送 10 s 之后,如果没有消费者进行消费,那么该消息就称为死信,它就会进入死信队列中
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        // 待发送的消息
        String message = "我是一只机智的小毛毛,很可爱,很机智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、测试过程及结果

启动 Consumer01 将普通交换机、普通队列注册到 RabbitMQ 上,启动 Consumer02 将死信交换机、死信队列注册到 RabbitMQ 上

然后为了演示消息超时之后可以进入死信队列,我们关闭 Consumer01,模拟其接收不到消息,为了不让死信消息被消费者消费掉,我们关闭 Consumer02,然后启动生产者 Producer

10 s 之后普通队列里的消息进入死信队列中

接着启动消费者 Consumer02 消费掉死信队列中的消息

 

四、队列达到最大长度

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明正常消息的交换机(类型为 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        // 设置正常队列的最大长度
        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交换机绑定正常消息队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消费者的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费者时的回调接口");
        };
        // 消费者消费消息
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);

        System.out.println("Consumer01 开始消费消息");
    }
}

2、Consumer02 代码不变

3、Producer 

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明一个 direct 类型的交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 待发送的消息
        String message = "我是一只机智的小毛毛,很可爱,很机智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、测试过程及结果

删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后按照上面的方式启动 Consumer01、Consumer02 重新注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer01、Consumer02,最后启动 Producer 发送消息(如果 Consumer01 是一直打开的情况下,正常队列的消息就不会堆积到 6 条)

启动 Consumer01、Consumer02,发现 Consumer01 消费了 6 条消息,Consumer02 消费了四条消息

 

五、消息被拒

1、Consumer01

public class Consumer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String NORMAL_ROUTING_KEY = "normal";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明正常消息的交换机(类型为 direct)
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
        // 正常消息交换机绑定正常消息队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);

        // 消息成功之后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            if (msg.contains("很机智4")) {
                System.out.println("Consumer01 接收到消息" + msg + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 取消消费者的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费者时的回调接口");
        };
        // 消费者消费消息(一定要开启手动应答,如果你开启了自动应答,根本不存在拒绝消息的情况)
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);

        System.out.println("Consumer01 开始消费消息");
    }
}

2、Consumer02

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";
    private static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道对象
        Channel channel = RabbitmqUtils.getChannel();

        // 声明死信交换机(topic 类型)
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 声明死信队列
        channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
        // 死信交换机绑定死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        // 消息成功之后的回调
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println(msg);
        };
        // 取消消费者的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费者时的回调接口");
        };
        // 消费者消费消息
        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
        System.out.println("Consumer02 开始消费消息");
    }
}

3、Producer

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定义工具类获取信道
        Channel channel = RabbitmqUtils.getChannel();

        // 声明一个 direct 类型的交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 待发送的消息
        String message = "我是一只机智的小毛毛,很可爱,很机智";
        for (int i = 1; i < 11; i++) {
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

4、测试过程及结果

删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后重新启动 Consumer01、Consumer02 注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer02,启动 Producer 发送消息

这里有几点需要注意一下

1、因为只有被拒绝的消息才能进入死信队列中,所以 Consumer01 不能关闭,为了能看到死信队列里的消息,不让它被消费掉,所以需要关闭 Consumer02

2、Consumer01 一定要开启手动确认,因为自动确认的场景下根本不存在消息被拒绝的情况

打开死信队列查看被拒绝的消息

启动 Consumer02 消费死信消息

 

原文地址:https://www.cnblogs.com/xiaomaomao/p/15546803.html