RabbitMQ 消息可靠性

一、RabbitMQ消息可靠性投递

1、什么是消息的可靠性投递

保证消息百分百发送到消息队列中去

  • 保证mq节点成功接受消息
  • 消息发送端需要接受到mq服务端接受到消息的确认应答
  • 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理

2、RabbitMQ消息投递路径

生产者-->交换机-->队列-->消费者

通过两个的点控制消息的可靠性投递

  • 生产者到交换机
    • 通过confirmCallback
  • 交换机到队列
    • 通过returnCallback

3、建议

开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真心不建议用消息确认机制

二、confirmCallback实战

1、生产者到交换机

通过confirmCallback

生产者投递消息后,如果Broker收到消息后,会给生产者⼀个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心

2、开启confirmCallback

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版, NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated

3、开发实战

本文示例承接上文:https://www.cnblogs.com/jwen1994/p/14367946.html

@Test
void testConfirmCallback() {
    template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * @param correlationData 配置
         * @param ack 交换机是否收到消息, true是成功, false是失败
         * @param cause 失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm====correlationData=" + correlationData);
            System.out.println("confirm====ack=" + ack);
            System.out.println("confirm=====cause=" + cause);
            //根据ACK状态做对应的消息更新操作 TODO
        }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1");
}

正常情况下的输出:

confirm====correlationData=null
confirm====ack=true
confirm=====cause=null

模拟异常:修改投递的交换机名称

template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "11111", "order.new", "新订单来啦1");

confirm====correlationData=null
confirm====ack=false
confirm=====cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'order_exchange111' in vhost '/dev', class-id=60, method-id=40)

三、returnCallback实战

1、交换机到队列

通过returnCallback,消息从交换器发送到对应队列失败时触发

两种模式

  • 交换机到队列不成功,则丢弃消息(默认)
  • 交换机到队列不成功,返回给消息生产者,触发returnCallback

2、开启returnCallback配置

#新版
spring.rabbitmq.publisher-returns=true

3、修改交换机投递到队列失败的策略

#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true

4、开发实战

/**
 * 交换机到队列可靠性投递测试
 */
@Test
void testReturnCallback() {
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            int code = returned.getReplyCode();
            System.out.println("code=" + code);
            System.out.println("returned=" + returned.toString());
        }
    });

    //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback");
    //模拟异常,投递一个没有绑定关系的路由key
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xdclass.order.new", "新订单ReturnsCallback");
}

模拟异常后,控制台输出如下:

code=312
returned=ReturnedMessage [message=(Body:'新订单ReturnsCallback' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=order_exchange, routingKey=xdclass.order.new]

四、RabbitMQ消息确认机制ACK

消费者从broker中监听消息,需要确保消息被合理处理

1、RabbitMQACK介绍

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQRabbitMQ收到反馈后才将此消息从队列中删除
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈, RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中
  • 只有当消费者正确发送ACK反馈, RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked

2、确认方式

  • 自动确认(默认)
  • 手动确认 manual
#开启⼿动确认消息,如果消息重新入队,进⾏重试
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、代码实战

package net.xdclass.xdclasssp.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws Exception {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag=" + msgTag);
        System.out.println("message=" + message.toString());
        System.out.println("body=" + body);

        //复杂业务逻辑

        //告诉broker,消息已经被确认
        channel.basicAck(msgTag, false);

        //告诉broker,消息拒绝确认
        //channel.basicNack(msgTag,false,true);

        //channel.basicReject(msgTag,true);
    }
}

deliveryTag介绍

  • 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

basicNack介绍

  • basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue

basicReject介绍

  • basicNack方法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue

人工审核异常消息

  • 设置重试阈值,超过后确认消费成功,记录消息,人工处理
原文地址:https://www.cnblogs.com/jwen1994/p/14371571.html