RabbitMQ死信队列

死信队列是什么

死信,Dead Letter,一种消息机制,当消费者去消费队列中的消息时,如果队列中的消息出现了以下的情况:

  • 消费端执行nack或者reject时,设置requeue=false;
  • 消息在队列中的时间超过设置的TTL(Time To Live)时间;
  • 队列中消息的数量超过设置的最大数量;

那么这些消息就可以被称之为死信消息,在配置了死信队列的情况下,死信消息会进入死信队列,如果没有配置死信队列,这些死信消息会被丢弃。

理解死信队列

死信队列并不仅仅只是一个queue,还包含死信交换机(Dead Letter Exchange),关于死信队列和死信交换机要说明几点:

死信交换机可以是fanout、direct、topic等类型,和普通交换机并无不同;

死信交换机要绑定要业务队列上才会生效;

给死信交换机绑定的队列称之为死信队列,其实就是普通的队列,没有任何特殊之处;

并不是整个项目只能设置一个死信交换机和死信队列,可以根据业务需要设置多个或者单个死信交换机使用不同的routing-key;

代码示例

配置文件

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: lzm
    password: lzm
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack
        default-requeue-rejected: false # 设置为false,requeue或reject

创建交换机和队列以及绑定

 /**
 * 死信交换机
 */
@Bean
public DirectExchange dlxExchange(){
	return new DirectExchange(dlxExchangeName);
}

/**
 * 死信队列
 */
@Bean
public Queue dlxQueue(){
	return new Queue(dlxQueueName);
}

/**
 * 死信队列绑定死信交换机
 */
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
	return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}

/**
 * 业务队列
 */
@Bean
public Queue queue(){
	Map<String,Object> params = new HashMap<>();
	params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机
	params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键
	params.put("x-message-ttl",10000);//设置队列消息的超时时间,单位毫秒,超过时间进入死信队列
	params.put("x-max-length", 10);//生命队列的最大长度,超过长度的消息进入死信队列
	return QueueBuilder.durable(queueName).withArguments(params).build();
}

/**
 * 业务交换机
 */
@Bean
public FanoutExchange fanoutExchange(){
	return new FanoutExchange(exchangeName,true,false);
}

/**
 * 业务队列和业务交换机的绑定
 */
@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange){
	return  BindingBuilder.bind(queue).to(fanoutExchange);
}

注意创建业务队列的部分,设置业务队列的超时时间是10s,队列中消息最大数量为10。

上面代码中,业务交换机为fanout类型的交换机,死信交换机为Direct类型的交换机。

生产者

public void send(){
	for (int i = 0; i < 5; i++) {
		CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,message -> {
			message.getMessageProperties().setExpiration(3000+"");//发送消息时设置消息的超时时间
			return message;
		},correlationData);
	}
}

注意:

队列中消息的超时时间可以是在创建队列时设置,表示对队列中所有的消息生效,也可以在发送消息时设置,两者相比取最小值作为TTL的值。

先不启动消费者,此时启动生产者并向其中发送消息,刚发送完消息时如下所示:

三秒后消息自动进入死信队列中

这也就验证了上述所说的,当消息在队列中的时间超过TTL的时间时,消息会自动进入死信队列。针对这一特性,可以给消息设置过期时间后发送到某个队列,从而来进行延迟消费

注意看上图的红框中的内容:

Lim:表示设置了队列中消息数量x-max-length参数

DLX:表示设置了死信交换机x-dead-letter-exchange参数

DLK:表示设置了死信路由键x-dead-letter-routing-key参数,不设置该值时,消息在进入死信队列后,路由键保持原来的不变,设置了该值,消息的路由键就变为新设置的值。

下面我们启动消费者,并且模拟在某些情况下执行nack操作,先看消费者代码

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
	try {
		if(msg.indexOf("5")>-1){
			throw new RuntimeException("抛出异常");
		}
		log.info("消息{}消费成功",msg);
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	} catch (Exception e) {
		log.error("接收消息过程中出现异常,执行nack");
		//第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
		log.error("消息{}异常",message.getMessageProperties().getHeaders());
	}
}

当消息中包含5时抛出异常,执行nack,其他消息都执行ack,生产者发送0-9共10条消息,执行结果如下:

同时查看死信队列中的数据,确实只有1条

并且消息的交换机以及路由键都是我们在代码中设置好的值

同时消息的headers中也会将进入死信队列的原因以及次数等进行说明

也就是说在执行nack,同时设置requeue=false时,消息会自动进入死信队列

最后我们再测试一下最大数量的问题,前面我们设置队列中最大数量是10,此时关闭消费者,同时删除队列的TTL,然后发送20条数据到业务队列中

可以看到业务队列和死信队列各有10条数据,也就是说队列中的消息数量超过设置的最大数量时,消息会进入死信队列

总结

死信交换机和死信队列都只是普通的交换机和队列,只不过被用来处理死信消息,而死信消息的产生是由于TTL过期或者队列中的消息数超过最大消息数,再或者时消费端reject或者nack消息时设置了requeue=false,消息变为死信后,由死信交换机路由到死信队列,再由专门的消费者消费死信队列中的消息。

死信队列更多的是用来保证消息的可靠性,主要用于比较重要的队列,用以确保未被正确消费的消息不会丢失,其实也可以不用死信队列,在消费端出现异常时,可以将消息从当前队列ack掉,再将其发送到其他队列,然后再单独处理其他队列,这都是可以的。

本节测试代码参考码云.

原文地址:https://www.cnblogs.com/ybyn/p/13691078.html