SpringBoot分布式事务RocketMQ

原文链接:https://blog.csdn.net/Timeguys/article/details/107949660

一、使用:

一、引入依赖:

  1.  
    <dependency>
  2.  
    <groupId>org.apache.rocketmq</groupId>
  3.  
    <artifactId>rocketmq-spring-boot-starter</artifactId>
  4.  
    <version>2.0.3</version>
  5.  
    </dependency>

二、举例:生产者创建订单---->生产者发送消息----->MQ服务接受消息----->消费者监听消息并减库存

【生产者】:

application.yml

  1.  
    rocketmq:
  2.  
    name-server: 192.168.85.128:9876 # rocketMQ地址
  3.  
    producer:
  4.  
    group: producer-group-test # 生产者的组名需要和消费者监听consumerGroup一致

业务代码:

  1.  
    @Service
  2.  
    public class OrderServiceImpl extends ServiceImpl<OrderMapper, TbOrder> implements OrderService {
  3.  
     
  4.  
    @Resource
  5.  
    private RocketMQTemplate rocketMQTemplate;
  6.  
     
  7.  
    @Override
  8.  
    public void create() {
  9.  
    //创建订单--> 发送消息 --> 消息发送成功后调用本地事务提交 -->
  10.  
    TbOrder order = new TbOrder();
  11.  
    order.setCount(10);
  12.  
    order.setMoney(BigDecimal.valueOf(10));
  13.  
    order.setProductId(1L);
  14.  
    order.setStatus(1);
  15.  
    order.setUserId(1L);
  16.  
    sendMsg(order);
  17.  
    }
  18.  
    @Override
  19.  
    public void sendMsg(TbOrder order){
  20.  
    /**
  21.  
    * String txProducerGroup, 生产者分组
  22.  
    * String destination, topic
  23.  
    * Message<?> message, 消息
  24.  
    * Object arg 消息参数
  25.  
    */
  26.  
    Message<String> build = MessageBuilder.withPayload(JSONObject.toJSONString(order)).build();
  27.  
    rocketMQTemplate.sendMessageInTransaction("tx-producer-group","txmsg-topic",build , null);
  28.  
    }
  29.  
    }

创建  ProducerTxmsgListener 并实现 RocketMQLocalTransactionListener:

  1.  
    @Component
  2.  
    // txProducerGroup 的值和发送事务消息指定的 txProducerGroup 相同
  3.  
    @RocketMQTransactionListener(txProducerGroup = "txmsg-producer-group")
  4.  
    public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
  5.  
    @Resource
  6.  
    private OrderService orderService;
  7.  
     
  8.  
    /**
  9.  
    * @Description: 执行本地事务提交
  10.  
    */
  11.  
    @Override
  12.  
    @Transactional
  13.  
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
  14.  
    try {
  15.  
    TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
  16.  
    System.out.println(tbOrder);
  17.  
    orderService.save(tbOrder);
  18.  
    return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
  19.  
    }catch (Exception e){
  20.  
    return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行异常,将消息遗弃
  21.  
    }
  22.  
    }
  23.  
     
  24.  
    /**
  25.  
    * @Description: 检查本地事务是否执行成功
  26.  
    */
  27.  
    @Override
  28.  
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
  29.  
     
  30.  
    TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
  31.  
    TbOrder order = orderService.getById(tbOrder.getId());
  32.  
    // 不为null 则表示执行成功
  33.  
    if (order != null){
  34.  
    return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
  35.  
    }
  36.  
    // 执行本地事务发生问题或还没执行完成, UNKNOWN 表示会继续回查
  37.  
    return RocketMQLocalTransactionState.UNKNOWN;
  38.  
    }
  39.  
    }

【消费者】:

application.yml

  1.  
    rocketmq:
  2.  
    name-server: 127.0.0.1:9876 # rocketMQ地址
  3.  
    producer:
  4.  
    group: producer-test-group # 生产者的组名需要和消费者监听consumerGroup一致

创建MyListener 并实现 RocketMQListener 接口:

  1.  
    // topic 对应生产者发消息是的topic
  2.  
    @RocketMQMessageListener(topic = "test-topic" , consumerGroup = "consumer-group")
  3.  
    public class MyListener implements RocketMQListener<String> {
  4.  
     
  5.  
    @Override
  6.  
    public void onMessage(String message) {
  7.  
    //执行 减库存业务 如果发生异常,则消息会隔段时间再次消费
  8.  
    System.out.println(message);
  9.  
    }
  10.  
    }

原理图:

原文地址:https://www.cnblogs.com/fswhq/p/13853532.html