RabbitMq防止消息丢失

RabbitMq如何防止消息丢失?

之前一直使用RocketMq。由于工作原因项目中用到RabbitMQ,在使用之前还是有必要了解下。所以带着第一个问题查询了些资料。

MQ若想避免消息丢失,当然只能做的尽量。除了各种MQ不同的主备或者集群策略外,总的指导原则就是:

1、生产者->broker 消息不丢失。 2、broker 消息不丢失, 3、broker->Consumer消息不丢失

每种MQ对于上面三个问题都有自己的解决方案。对于rabbitMq如何解决这个问题。落实到代码层面上:

1、生产者到broker消息不丢失(消息持久化到磁盘)

 使用springboot集成,由于经过了一层层封装,所以经过查看源码,发现最后通过channel.basicPublish()向mq发布消息的时候需要设置 messageProperities。

而MessageProperties使用Message中读取的,所以从哪里如何设置MessageProperties成了问题。好在发现SpringBoot增加MessageProcess这一接口,然后可以定制自己信息

(1)message持久化

1  rabbitTemplate.convertAndSend(rabbitMqProperty.getOrderMsg().getExchangeName(), rabbitMqProperty.getOrderMsg().getRouteKeyName(), messages, new MessagePostProcessor() {
2     @Override
3  public Message postProcessMessage(Message message) throws AmqpException {
4         // 消息持久化
5  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
6  return message;
7  }
8 });

(2)return-callback设置

     设置RabbitMq publisher-returns 属性为true, 即消息没到成功到达queue会触发CallBack回调。  即 生产者的 确认机制

 1   @Bean
 2     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
 3         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
 4         rabbitTemplate.setMandatory(true);
 5         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
 6         rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
 7             mqSendFailProcesser.retrySendMessage(rabbitTemplate, message, replyCode, replyText, exchange, routingKey);
 8         });
 9         return rabbitTemplate;
10     }

2、broker消息不丢失

      队列Queue和交换机持久化到磁盘

1     @Bean
2     public Queue payNofityQueue() {
3         return  QueueBuilder.durable(payNotifyMqProperty.getQueueName()).build();
4     }
5 
6     @Bean
7     public DirectExchange exchange() {
8         return new DirectExchange(payNotifyMqProperty.getExchangeName(), true, false);
9     }

  

3、broker到消费者消息不丢失(ACK应答)

1 失败响应
2 
3 private void sendNack(Message message, Channel channel) throws IOException {
4     channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
5 }
6 成功响应
7 private void sendAck(Message message, Channel channel) throws IOException {
8     channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
9 }
原文地址:https://www.cnblogs.com/mxmbk/p/9411288.html