SpringCloudAlibaba

前言

RocketMQ提供了事务消息去解决程序异常回滚但消息已发出的问题,如服务A插入一条数据后服务B需要对用户数据进行修改,而服务A发出消息后程序发生异常导致数据插入回滚,而服务B监听到消息又对数据进行了修改,导致数据出现问题


环境

Spring Cloud Hoxton.SR9 + Spring Cloud Alibaba 2.2.6.RELEASE + RocketMQ 4.7.0


分布式事务消息流程

流程图

在这里插入图片描述

流程解析

  • 第1步:生产者向MQ Server发送半消息(特殊消息,会被存储到MQ Server且标记为暂时不能投递),消费者不会接收到这条消息
  • 第2 3步:当半消息发送成功后生产者就去执行本地事务
  • 第4步:生产者根据本地事务的执行状态向MQ Server发送二次确认请求,如果MQ Server收到的是commit就将半消息标记为可投递,消费者即可消费到该消息,如果接收到是rollback就将这条半消息删除
  • 第5步:如果第四步的二次确认没有能够成功发送到MQ Server,经过一段时间后,MQ Server会向生产者发送回查消息去获取本地事务的执行状态
  • 第6步:生产者检查本地事务执行状态
  • 第7步:生产者根据本地事务的执行结果告诉MQ Server应该commit还是rollback,如果是commit则像消费者投递消息,如果是rollback则丢弃消息

注:
1234步是一种二次确认的机制,生产者把消息发送到MQ,MQ做了标记不让去消费这条消息,生产者去执行本地事务,完成后根据执行状态去投递或丢弃消息
567步是MQ没有收到二次确认做的容错处理


事务消息三种状态

  • Commit:提交事务消息,消费者可以消费此消息
  • Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
  • UNKNOWN: broker需要回查确认该消息的状态

具体实现

实现代码

问题场景:内容中心插入一条数据后用户中心需要对用户数据进行修改,而内容中心发出消息后程序发生异常导致数据插入回滚,而用户中心监听到消息又对数据进行了修改导致数据的不一致,下面将用RocketMQ的分布式事务消息验证下该场景的处理方式


内容中心

  • 表结构
CREATE TABLE `test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `title` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='插入数据测试表'

CREATE TABLE `rocketmq_transaction_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `transaction_Id` varchar(45) COLLATE utf8_unicode_ci NOT NULL COMMENT '事务id',
  `log` varchar(45) COLLATE utf8_unicode_ci NOT NULL COMMENT '日志',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='RocketMQ事务日志表'
  • TestRocketController.java
@PostMapping("test1")
public Test test1() {
    return testService.insertTest();
}
  • TestService.java
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;

@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TestService {

    private final TestMapper testMapper;
    private final RocketMQTemplate rocketMQTemplate;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    public Test insertTest() {
        Test test = Test.builder()
                .title("世事短如春梦,春梦了无痕,譬如春梦,黄粱未熟蕉鹿走")
                .build();

        /**
         * 发送半消息 对应步骤一
         * 参数1:Topic
         * 参数2:消息体
         *      可设置header,可用作参数传递
         * 参数2:arg 可用作参数传递
         */
        rocketMQTemplate.sendMessageInTransaction(
                "add-test",
                MessageBuilder.withPayload(test)
                              .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                              .build(),
                 test
        );

        return test;
    }

    /**
     * 插入数据且记录事务日志
     */
    @Transactional(rollbackFor = Exception.class)
    public void insertTestDataWithRocketMqLog(Test test, String transactionId) {
        this.insertTestData(test);

        rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("插入了一条Test数据...")
                        .build()
        );
    }

    /**
     * 插入测试数据
     * @param test
     */
    @Transactional(rollbackFor = Exception.class)
    public void insertTestData(Test test) {
        testMapper.insertSelective(test);
    }
}
  • TestMapper.java
public interface TestMapper extends Mapper<Test> {
}
  • RocketmqTransactionLogMapper.java
public interface RocketmqTransactionLogMapper extends Mapper<RocketmqTransactionLog> {
}
  • Test.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@Table(name = "test")
public class Test {

    /**
     * id
     */
    @Id
    @GeneratedValue(generator = "JDBC")
    private Integer id;

    /**
     * 标题
     */
    private String title;

}
  • RocketmqTransactionLog.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@Table(name = "rocketmq_transaction_log")
public class RocketmqTransactionLog {
    /**
     * id
     */
    @Id
    @GeneratedValue(generator = "JDBC")
    private Integer id;

    /**
     * 事务id
     */
    @Column(name = "transaction_Id")
    private String transactionId;

    /**
     * 日志
     */
    private String log;
}
  • AddTestTransactionListener.java
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import java.util.Objects;

/**
 * 事务监听
 */
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddTestTransactionListener implements RocketMQLocalTransactionListener {

    private final TestService testService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 执行本地事务,对应步骤三
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        MessageHeaders headers = message.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        try {
            testService.insertTestDataWithRocketMqLog((Test) o, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 本地事务回查,对应步骤六
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // 根据记录的事务回查
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        // 本地事务执行成功
        if (Objects.nonNull(transactionLog)) {
            return RocketMQLocalTransactionState.COMMIT;
        }

        // 本地事务执行失败
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

用户中心

  • TestRocketConsumer.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-test")
public class TestRocketConsumer implements RocketMQListener<Test> {
    @Override
    public void onMessage(Test test) {
        // TODO 业务处理
        try {
            log.info("监听到主题为'add-test'的消息:" + new ObjectMapper().writeValueAsString(test));
            log.info("可以开始处理业务啦啦啦");
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

测试

  • 如图所示,在执行数据插入后还未向MQ Server发送本地事务的执行状态时,模拟服务宕机,将服务kill

在这里插入图片描述

  • Kill内容中心进程

在这里插入图片描述

  • 此时未向MQ Server发送本地事务的执行状态,MQ Server中的消息不会投递到用户中心,用户中心未收到消息不会进行后续的业务处理,如下所示,重启应用后进入本地事务回查

在这里插入图片描述

  • 本地事务回查后用户中心正常监听到消息进行业务处理

在这里插入图片描述

  • 至此,已完成RocketMQ分布式事务消息的实现

项目源码


- End -
白嫖有风险
点赞加收藏
以上为本篇文章的主要内容,希望大家多提意见,如果喜欢记得点个推荐哦
作者:Maggieq8324
本文版权归作者和博客园共有,欢迎转载,转载时保留原作者和文章地址即可。
原文地址:https://www.cnblogs.com/maggieq8324/p/15350898.html