RabbitMQ 消息确认

参考:https://blog.csdn.net/qq_29914837/article/details/93376741

消息确认方式:

1、消息发送确认

a、通过AMQP(高级消息队列协议)事务确认

(1)txSelect用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。
(2)txCommit用于提交事务。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。
(3)txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了

channel.txSelect();
 //ConfirmConfig.exchangeName(交换机名称)
 //ConfirmConfig.routingKey(路由键)
 //message (消息内容)
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message));
channel.txCommit();

b、由rabbitmq实现的confirm跟return机制

生产者通过confirm机制确认消息是否送达。producer端消息确认分两步,一时确认是否到达交换机,二是确认是否到达队列。

confirm实现原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

前提:需要开启publisher-confirms: true
对于ConfirmCallback来说:
如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true

对于ReturnCallback来说:
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,消息就丢了)
比如路由不到队列时触发回调

2、消息接收确认

acknowledge机制说明:
a、通过 ACK机制(消息确认机制)确认消息是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
b、默认情况下,一条消息被消费者正确消费就会从队列中移除
c、自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端处理逻辑抛出异常,也就是消费端没有成功处理这条消息,那么就相当于丢失了消息
d、如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
e、如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作
f、如果某个服务忘记确认 ACK 了,则 RabbitMQ 不会再发送此消息数据给它,只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。
g、ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

acknowledge的三种模式
a、none:默认情况下消息消费者是NONE模式,默认所有消息消费成功,会不断的向消费者推送消息。因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险
b、auto:在自动确认模式下,消息发送后即被认为成功投递,不管消费者端是否成功处理本次投递
c、mandal:消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功

注:使用下面手动签收消息的方式需要将Acknowledge设置为manual
channel.basicAck(deliveryTag, multiple);
deliveryTag:deliveryTag(唯一标识 ID):当一个消费者向RabbitMQ注册后,会建立起一个Channel,RabbitMQ会用basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag,它代表了RabbitMQ向该Channel投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag的范围仅限于Channel
multiple:false表示ack当前消息;true表示将一次性ack所有小于deliveryTag的消息。

channel.basicNack(deliveryTag, multiple, requeue);
requeue:false表示丢弃消息,true表示将消息重新入队

channel.basicReject(deliveryTag, requeue);
requeue:false表示丢弃消息,true表示将消息重新入队
basicNack()与basicReject()的区别在于basicReject方法只处理一条消息。

—转载请注明出处
原文地址:https://www.cnblogs.com/landiss/p/14575798.html