RocketMQ 的事务消息

rocketMQ解决分布式事务的思路:1、a事务成功和mq收到消息保持一致。2、保证这条消息一定会被消费,从而完成b事务。时效性可能差了点,但是能达到最终的一致,优点是不会阻塞。

其中第二步保证消息一定会被消费可以看之前的博文,消费端用集群模式可以做到这一点。

下面来看怎样使得 事务a成功 和 mq收到消息 保持一致:

sendMessageInTransaction中,往header中放PROPERTY_TRANSACTION_PREPARED属性,值为true,请求来到了broker这边的SendMessageProcessor.sendMessage方法中,判断刚才的值为true的话,调用getTransactionalMessageService().prepareMessage,把message原来的topic和queueId隐藏起来,替换为topic: RMQ_SYS_TRANS_HALF_TOPIC queueId : 0,然后存入commitLog中,这个topic是没人订阅的所以不会被消费,发送成功后,执行a事务(这里执行a事务是在transactionListener.executeLocalTransaction方法中,transactionListener还有一个方法是checkLocalTransaction,这个方法是查看事务是否完成,逻辑都是需要用户自己写,两者之间要一致)。

正常情况:如果事务a成功,会发送endTransaction的消息,broker收到之后会把RMQ_SYS_TRANS_HALF_TOPIC 中的消息取出来放到事务消息真实对应的topic中,被b消费。

异常情况:

1、half请求broker没收到,broker将不会存储half消息,producer也不会执行下面的事务。

2、half请求broker收到,但是producer没有成功收到broker的响应,那么producer暂时不会执行自己的事务,broker会有回查机制,发现producer没有执行事务,就不会把half消息放到真实topic中。

3、成功发送half消息,但是提交事务或者回滚事务的消息broker没有收到,还是会被回查。

回查的代码逻辑:broker的处理方式是间隔一定时间扫描half消息,然后发出请求向producer回查(间隔1分钟,最多15次),逻辑是BrokerController.start--->startProcessorByHa--->transactionalMessageCheckService.start()--->waitForRunning--->onWaitEnd--->getTransactionalMessageService().check--->listener.resolveHalfMsg--->sendCheckMessage给consumer,consumer收到CHECK_TRANSACTION_STATE消息,checkTransactionState--->producer.checkTransactionState--->transactionListener.checkLocalTransaction(如上文所说,检查本地a事务的情况)--->processTransactionState--->endTransactionOneway(再次提交事务) 

原文地址:https://www.cnblogs.com/chuliang/p/13141913.html