分布式事务解决方案4--消息队列MQ(事务最终一致方案)

1、消息队列MQ(事务最终一致方案)介绍

原理、流程与本地消息表类似

不同点:

1) 本地消息表改为MQ

2) 定时任务改为MQ的消费者

架构图

优点: 

不依赖定时任务,基于MQ更高效、更可靠。

适合于公司内的系统 (比如公司内的多个系统,要做一致性处理)

不同公司之间无法基于MQ,本地消息表更合适

2、Rabbit MQ安装

参考:Windown10下Rocket MQ 安装

3、Rocket MQ生产者和消费者配置

@Configuration
public class RocketMQConfig {

    /**
     * 生产者 配置
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer producer(){

        DefaultMQProducer producer = new
                DefaultMQProducer("paymentGroup");
        producer.setNamesrvAddr("localhost:9876");
        return producer;
    }

    /**
     * 消费者配置
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown" )
    public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");

        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("payment", "*");

        consumer.registerMessageListener(messageListener);
        return  consumer;
    }
}

  

4、业务实现

 1) 配置RocketMQ

@Configuration
public class RocketMQConfig {

    /**
     * 生产者 配置
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer producer(){

        DefaultMQProducer producer = new
                DefaultMQProducer("paymentGroup");
        producer.setNamesrvAddr("localhost:9876");
        return producer;
    }

    /**
     * 消费者配置
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown" )
    public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");

        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("payment", "*");

        consumer.registerMessageListener(messageListener);
        return  consumer;
    }
}

  这里的主题名称为payment

2) 服务层PaymentService

增加支付接口。扣掉余额后,将消息放到消息队列

    /**
     * 支付接口(消息队列) 。扣掉余额后,将消息放到消息队列
     * @param userId 用户Id
     * @param orderId 订单Id
     * @param amount 支付金额
     * @return 0: 成功; 1:用户不存在  2:余额不足
     */
    @Transactional(transactionManager = "tm134", rollbackFor = Exception.class)
    public int paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {

        //支付操作 扣款
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if(accountA == null){
            return  1;
        }
        if(accountA.getBalance().compareTo(amount) < 0){
            return 2;
        }

        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);

        Message message = new Message();
        message.setTopic("payment");
        message.setKeys(orderId +"");
        message.setBody("订单已支付".getBytes());
        try {
           SendResult result = producer.send(message);
           if(result.getSendStatus() == SendStatus.SEND_OK){
               return  0;
           }else {
               throw  new Exception("消息发送失败");
           }
        } catch (Exception e) {
            e.printStackTrace();
            throw  e;
        }



    }

  

3)、控制层增加支付接口

@RestController
public class PaymentController {

    @Autowired
    private PaymentService paymentService;


    //localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200
    @RequestMapping("paymentMQ")
    public String paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
        int result = paymentService.paymentMQ(userId, orderId,amount);
        return  "支付结果:" + result;
    }
}

  

4、消费者

@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {


    @Resource
    OrderMapper orderMapper;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list == null || list.size() == 0) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        System.out.println("Rocket MQ 消费者接收到消息");
        //默认list最大值为1
        for (MessageExt messageExt : list) {
            String orderId = messageExt.getKeys();
            String msg = new String(messageExt.getBody());
            System.out.println("msg=" + msg);

            Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));

            if (order == null) {
                //消费者订单查询不到,根据业务是否要再次消费
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            try {
                order.setOrderStatus(1); //已支付
                order.setUpdateTime(new Date());
                order.setUpdateUser(0); //系统更新
                orderMapper.updateByPrimaryKey(order);
            }catch (Exception e){
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }


        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}  

消费者收到消息队列的消息后,更新订单状态为已支付。

5、数据准备

134和129两台数据库

 129数据库的t_order表中,order_status为0 未支付

6、测试

localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200

 可以看到金额扣款200,并且订单状态order_status=1 已支付。

原文地址:https://www.cnblogs.com/linlf03/p/14010865.html