第八章 事务型消息

一。关于消息队列的事物问题

spring的@Transactional标签只有当整个方法执行完成后才commit,这样如果因为网络问题即使整个方法执行成功,方法中消息队列发送成功,但是commit时失败了,减库存的rocketmq无法回滚。

解决方法1:spring @Transactional提供在事务提交成功后再执行某些方法的能力

在创建好订单入库后,最后执行异步更新库存

    // 在最近的一个@Transactional提交成功后才会执行
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
        	@Override
        	public void afterCommit() {
        	  // 4.异步同步库存
                  boolean mqResult = itemService.asyncDecreaseStock(itemId, amount);
            // 发送失败可能是没收到返回的确认消息,实际已经同步成功了   if(!mqResult) { itemService.increaseStock(itemId, amount); //throw new BusinessException(EmBusinessError.SEND_ROKETMQ_FAIL);   } } });

  

方法1的问题是,当异步消息发送失败后就没办法回滚了,失败就永远丢失了该消息,但是订单已经创建造成超卖,所以考虑事务型rocketmq

解决方法2:transaction rocketmq

发送事务型消息,二次提交的状态,broker收到的消息是prepare状态不可被消费端感知

发送后首先执行本地的executeLocalTransaction方法,创建订单,只有创建成功向brocker发送commit消息,brocker中的消息状态变为可执行,可以被消费端消费

本地方法执行失败则回滚撤回消息,若长时间为返回commit或rollback消息,则执行checkLocalTransaction,检查下单是否成功

问题 : 本地方法执行成功,但是commit状态消息发送失败如何处理,

需要增加库存日志流水,来记录创建订单的状态,从而在checkLocalTransaction时可以根据这个状态来判断是发送成功还是回滚消息

刚创建订单时插入一条初始状态的log,订单创建成功状态改为成功,异常状态改为失败

@Component
public class MQProducer {
	Log log = LogFactory.getLog(getClass());
	@Value("${mq.nameserver.addr}")
	private String nameServer;
	
	@Value("${mq.topicname}")
	private String topicName;
	TransactionMQProducer transactionMQProducer;
	@Autowired
	OrderService orderService;
	
	@Autowired
      private StockLogDOMapper stockLogDOMapper;
	
	@PostConstruct
	public void init() throws MQClientException {
		producer = new DefaultMQProducer("producer");
		producer.setNamesrvAddr(nameServer);
		producer.start();
		
		transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
		transactionMQProducer.setNamesrvAddr(nameServer);
		transactionMQProducer.start();
		transactionMQProducer.setTransactionListener(new TransactionListener() {
			// 发送后实际先执行的本地方法
			@Override
			public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
				Integer userId = (Integer) ((Map)arg).get("userId");
				Integer itemId = (Integer) ((Map)arg).get("itemId");
				Integer promoId = (Integer) ((Map)arg).get("promoId");
				Integer amount = (Integer) ((Map)arg).get("amount");
				String stockLogId = (String)((Map)arg).get("stockLogId");
				try {
					// 创建订单
					orderService.createOrder(userId, itemId, promoId, amount, stockLogId);
				} catch (BusinessException e) {
					// 库存流水状态更新成失败,防止rollback消息发送失败
					e.printStackTrace();
					StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
					stockLogDO.setStatus(3);
					stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
					// broker中的prepare消息状态转为rollback等于撤回没发
					return LocalTransactionState.ROLLBACK_MESSAGE;
				}
				// broker中的prepare消息状态变为commit可执行的
				return LocalTransactionState.COMMIT_MESSAGE;
			}
			/** 当消息中间件发现长时间没有以上两种状态返回,比如死机或createOrder长时间不返回,prepare消息状态默认是unknown状态
			* 调用检查方法看库存扣减和下单是否成功
			*/
			@Override
			public LocalTransactionState checkLocalTransaction(MessageExt msg) {
				// 根据是否扣减库存成功,来判断返回COMMIT,ROLLBACK还是UNKNOWN状态
				String jsonString = new String(msg.getBody());
				Map<String,Object> map = JSON.parseObject(jsonString, Map.class);
				Integer itemId = (Integer) map.get("itemId");
				Integer amount = (Integer) map.get("amount");
				String stockLogId = (String) map.get("stockLogId");
				
				StockLogDO stockLogDO= stockLogDOMapper.selectByPrimaryKey(stockLogId);
				if(stockLogDO == null) {
					return LocalTransactionState.UNKNOW;
				}
				if(stockLogDO.getStatus().intValue() == 1) {
					return LocalTransactionState.UNKNOW;
				} else if(stockLogDO.getStatus().intValue() == 2) {
					return LocalTransactionState.COMMIT_MESSAGE;
				}
				return LocalTransactionState.ROLLBACK_MESSAGE;
			}
		});
	}
	
	public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount, String stockLogId) {
		Map<String,Object> bodyMap = new HashMap<String,Object>();
		bodyMap.put("itemId", itemId);
		bodyMap.put("amount", amount);
		bodyMap.put("stockLogId", stockLogId);
		Message msg = new Message(topicName,"increase", 
				JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
		Map<String,Object> argMap = new HashMap<String,Object>();
		argMap.put("itemId", itemId);
		argMap.put("amount", amount);
		argMap.put("userId", userId);
		argMap.put("promoId", promoId);
		argMap.put("stockLogId", stockLogId);
		
		TransactionSendResult result = null;
		try {
			// 发送事务型消息,二次提交的状态,broker收到的消息是prepare状态不可被消费端感知
			// 回调执行listener中的executeLocalTransaction这个方法成功,消息状态变为可执行的,再被执行
			result = transactionMQProducer.sendMessageInTransaction(msg, argMap);
		} catch (MQClientException e) {
			e.printStackTrace();
			return false;
		}
		if(result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
			return true;
		} else {
			return false;
		}
	}
}

  

PS:

问题一:local事务执行成功,消息发送失败,还能一致性吗

答:只要local事务成功了,stocklog必定是2这个成功状态,这样即便消息发送失败,由于之前有prepare消息,消息中间件没有收到明确的commit或者rollback就会触发check回查,这个时候stocklog是2就会补发commit

问题2:返回commit后消费端执行失败,怎么保证一致性

答:消费方消费完消息会反回给消息中间件consumer success 这个时候broker才会去删除消息 如果消费者程序异常 则消息中间件不会收到消费成功 每隔一段时间会重试

rocketmq的内部机制很大程度上确保了这一步的操作会成功

问题3:redis不可用时,如何操作?

使用数据库数据来扣减,可是如何确定异步同步消息已经都消费了,否则实际数据库库存会比正常多。

一般只能少卖不能多卖。程序block,运维来恢复

问题4:超时释放的问题,creatOrder方法卡住了一直没有返回

出现大面积假死,redis已经被减了,但是订单没有成功,后台需要释放并回滚当超过十五分钟,将redis加上去

二。售罄问题

当redis减库存时如果返回0,则增加售罄标识,下单刚开始时就需要判断是否有这个标识,否则返回库存不足

原文地址:https://www.cnblogs.com/t96fxi/p/12093945.html