死信队列:DLX,dead-letter-exchange
利用dlx,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个exchange,这个exchange就是dlx
消息变成死信的原因有:
1.消息被拒绝(basic.reject / basic.nack)并且reQueue=false 2.消息TTL过期 3.队列达到最大长度了
1.声明死信队列,交换机等。
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { //死信队里s @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 声明 死信交换机 args.put("x-dead-letter-exchange", "DL_EXCHANGE"); // x-dead-letter-routing-key 声明 死信路由键 args.put("x-dead-letter-routing-key", "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * 死信路由通过 DL_KEY 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * 死信路由通过 KEY_R 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); } //死信队里e }
当DL_KEY 对应的队列中存在死信时,rabbitMQ就会自动的将这个消息重新发布到设置的exchange上去,进而被路由到KEY_R对应的队列。
。
2.生产者:
import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * Created by Jim on 2018/11/24. */ @Component public class DLQueue { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置编码 messageProperties.setContentEncoding("utf-8"); // 设置过期时间10*1000毫秒 messageProperties.setExpiration("5000"); return message; }; rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor); } }
这里设置了5000的过期时间,一旦消息到了过期时间还没被正常消费,就会变成死信。
3.DL_KEY 对应的消费者:
@Component public class DlxMessageReceiver1 { @RabbitHandler @RabbitListener(queues = "DL_QUEUE")//方法级注解 public void process(String msg, Channel channel, Message message){ System.out.println("死信DL_KEY收到 : " + msg ); System.out.println(1/0); } }
为了测试,这里注意不要捕获异常,捕获了异常的话,不会进入死信。
4.KEY_R对应的消费者
@Component public class DlxMessageReceiver { @RabbitHandler @RabbitListener(queues = "REDIRECT_QUEUE") public void process(String msg, Channel channel, Message message)throws Exception { System.out.println("死信KEY_R: " +msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
在上面2中我们设置了5秒的过期时间,下面我们看看是不是5秒过后就变成死信。
第一次消费失败的时间是:2018-11-24 20:57:11.949
死信DL_KEY : 123 2018-11-24 20:57:11.949 WARN 616 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
最后一次消费失败的时间是:2018-11-24 20:57:13.871
死信DL_KEY : 123 2018-11-24 20:57:13.871 WARN 616 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
死信KEY_R: 123
由结果来看,好像没到5秒就过期了,我也不是很明白,有空再研究。