RabbitMQ TTL、死信队列

TTL概念

TTL是Time To Live的缩写,也就是生存时间。
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。

TTL是Time To Live的缩写,也就是生存时间。RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。

RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.

RabbitMQ允许您为消息和队列设置TTL(生存时间)。 这可以使用可选的队列参数或策略来完成(建议使用后一个选项)。 可以对单个队列,一组队列强制执行消息TTL,也可以为单个消息应用消息TTL。

​ ——摘自 RabbitMQ 官方文档

1.消息的 TTL

我们在生产端发送消息的时候可以在 properties 中指定 expiration属性来对消息过期时间进行设置,单位为毫秒(ms)。

     /**
         * deliverMode 设置为 2 的时候代表持久化消息
         * expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除
         * headers 自定义的一些属性
         * */
        //5. 发送
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("myhead1", "111");
        headers.put("myhead2", "222");

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("100000")
                .headers(headers)
                .build();
        String msg = "test message";
        channel.basicPublish("", queueName, properties, msg.getBytes());

我们也可以后台管理页面中进入 Exchange 发送消息指定expiration

2.队列的 TTL

队列属性:x-message-ttl 
可以控制被publish到queue中的message 被丢弃前能够存活的时间

我们也可以在后台管理界面中新增一个 queue,创建时可以设置 ttl,对于队列中超过该时间的消息将会被移除。

//Java代码:message在该queue的存过时间最大为60秒
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

死信队列

死信队列:没有被及时消费的消息存放的队列

消息没有被及时消费的原因:

  • a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

  • b.TTL(time-to-live) 消息超时未消费

  • c.达到最大队列长度

1.DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
2.当这个队列中有死信时,RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列
3.可以监听这个队列的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

实现死信队列步骤

  • 首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

    Exchange: dlx.exchange
    Queue: dlx.queue
    RoutingKey: # 代表接收所有路由 key
  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )
  • 这样消息在过期、requeue失败、 队列在达到最大长度时,消息就可以直接路由到死信队列!

创建生产者

package com.dwz.rabbitmq.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_dlx_exchange";
        String routingkey = "dlx.save";
        
        for(int i = 0; i < 1; i++) {
            String msg = "Hello rabbitmq dlc-"+i+" message!";
            
            Map<String, Object> headers = new HashMap<>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("utf-8")
                    .expiration("10000")
                    .headers(headers)
                    .build();
            channel.basicPublish(exchangeName, routingkey, properties, msg.getBytes());
        }
        
        channel.close();
        connection.close();
    }
}

创建消费者

package com.dwz.rabbitmq.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_dlx_exchange";
        String routingkey = "dlx.#";
        String queueName = "test_dlx_queue";
        
        //这是一个普通的交换机、队列和路由
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");
        channel.queueDeclare(queueName, true, false, false, arguments);
        channel.queueBind(queueName, exchangeName, routingkey);
        
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println("消费端:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        //手工签收必须关闭autoAck设置为false
        channel.basicConsume(queueName, false, consumer);
    }
}

测试步骤

1.先启动consumer,在管控平台上看到test_dlx_exchange、test_dlx_queue和dlx.exchange、dlx.queue都创建完成然后停止consumer。

2.启动producer,10秒钟前发现消息在普通队列test_dlx_queue中,10秒钟后发现消息出现在死信队列dlx.queue中。

总结

DLX也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的DLX Exchange 上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理。

相关文章

RabbitMQ 消费端限流、TTL、死信队列

RabbitMQ Queue中Arguments属性参数过期队列,过期消息,超时队列的声明

原文地址:https://www.cnblogs.com/zheaven/p/11833521.html