一、RabbitMQ消息可靠性投递
1、什么是消息的可靠性投递
保证消息百分百发送到消息队列中去
- 保证mq节点成功接受消息
- 消息发送端需要接受到mq服务端接受到消息的确认应答
- 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理
2、RabbitMQ消息投递路径
生产者-->交换机-->队列-->消费者
通过两个的点控制消息的可靠性投递
- 生产者到交换机
- 通过confirmCallback
- 交换机到队列
- 通过returnCallback
3、建议
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真心不建议用消息确认机制
二、confirmCallback实战
1、生产者到交换机
通过confirmCallback
生产者投递消息后,如果Broker收到消息后,会给生产者⼀个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
2、开启confirmCallback
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调 spring.rabbitmq.publisher-confirms=true #新版, NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法 spring.rabbitmq.publisher-confirm-type=correlated
3、开发实战
本文示例承接上文:https://www.cnblogs.com/jwen1994/p/14367946.html
@Test void testConfirmCallback() { template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 配置 * @param ack 交换机是否收到消息, true是成功, false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm====correlationData=" + correlationData); System.out.println("confirm====ack=" + ack); System.out.println("confirm=====cause=" + cause); //根据ACK状态做对应的消息更新操作 TODO } }); template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1"); }
正常情况下的输出:
confirm====correlationData=null confirm====ack=true confirm=====cause=null
模拟异常:修改投递的交换机名称
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "11111", "order.new", "新订单来啦1");
confirm====correlationData=null confirm====ack=false confirm=====cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'order_exchange111' in vhost '/dev', class-id=60, method-id=40)
三、returnCallback实战
1、交换机到队列
通过returnCallback,消息从交换器发送到对应队列失败时触发
两种模式
- 交换机到队列不成功,则丢弃消息(默认)
- 交换机到队列不成功,返回给消息生产者,触发returnCallback
2、开启returnCallback配置
#新版 spring.rabbitmq.publisher-returns=true
3、修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败,则会返回给生产者 spring.rabbitmq.template.mandatory=true
4、开发实战
/** * 交换机到队列可靠性投递测试 */ @Test void testReturnCallback() { template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned.toString()); } }); //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback"); //模拟异常,投递一个没有绑定关系的路由key template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xdclass.order.new", "新订单ReturnsCallback"); }
模拟异常后,控制台输出如下:
code=312 returned=ReturnedMessage [message=(Body:'新订单ReturnsCallback' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=order_exchange, routingKey=xdclass.order.new]
四、RabbitMQ消息确认机制ACK
消费者从broker中监听消息,需要确保消息被合理处理
1、RabbitMQ的ACK介绍
- 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ, RabbitMQ收到反馈后才将此消息从队列中删除
- 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈, RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中
- 只有当消费者正确发送ACK反馈, RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
- 消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked
2、确认方式
- 自动确认(默认)
- 手动确认 manual
#开启⼿动确认消息,如果消息重新入队,进⾏重试
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、代码实战
package net.xdclass.xdclasssp.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "order_queue") public class OrderMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws Exception { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message.toString()); System.out.println("body=" + body); //复杂业务逻辑 //告诉broker,消息已经被确认 channel.basicAck(msgTag, false); //告诉broker,消息拒绝确认 //channel.basicNack(msgTag,false,true); //channel.basicReject(msgTag,true); } }
deliveryTag介绍
- 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
basicNack介绍
- basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue。
basicReject介绍
- basicNack方法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue。
人工审核异常消息
- 设置重试阈值,超过后确认消费成功,记录消息,人工处理