Kafka提升——接收-处理-发送事务

接收-处理-发送事务

在消费者接收到数据后,对数据进行处理,然后进行发送到队列。

例如:
用户注册成功后,获得注册优惠券。

当用户注册成功后,需要向用户表插入数据。同时需要向优惠券表插入新的优惠券信息。若在单体应用中,事务的实现非常容易实现,但是在分布式的服务中,事务的实现就需要进行研究了。

分布式的事务暂时不表,此处就讨论消息队列如何完成这里的实现。

假设,目前有用户服务A,优惠券服务B,通过消息队列进行消息传递。

当用户注册时,首先向A发送消息,完成用户的注册表的插入。然后向B发送信息,通知插入优惠券信息。(loger 代替数据库插入,肯定满足事务,不需要多考虑)

	@KafkaListener(topics = "registry", containerFactory = "stringKafkaListenerContainerFactory2")
	@KafkaListener(topics = "youhuiquanhuidiao", containerFactory = "stringKafkaListenerContainerFactory2")
	@Transactional
	public void receiveStringRegistry(String message,
									  @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		if("youhuiquanhuidiao".equals(topic)){
			logger.info("优惠券给他了已经!!");
		}else {
				logger.info("收到用户信息:"+message);
				logger.info("用户信息插入");
				logger.info("用户信息插入完成");
				transactionalTemplate.send("youhuiquan","新注册用户,给他个优惠券!");
		}
	}
	

B收到消息后,对优惠券信息进行插入,插入完成后,发送信息回调再做另外处理。

	
	@KafkaListener(topics = "youhuiquan", containerFactory = "stringKafkaListenerContainerFactory2")
	@Transactional
	public void receiveStringYouhuiquan(String message,
										@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
		try{
			logger.info("收到信息:"+message);

			logger.info("优惠券插入");

			logger.info("优惠券插入完成");

			int i = 1/0;

			transactionalTemplate.send("youhuiquanhuidiao","优惠券给他了已经!!");
		}catch (Exception e){
			transactionalTemplate.send("youhuiquanhuidiao","发生错误了,优惠券没给成!");
		}
	}

controller

	@RequestMapping("/testSendMsg4")
	@ResponseBody
	@Transactional
	//@KafkaListener(topics = "topic1", containerFactory = "stringKafkaListenerContainerFactory2")
	public String testSendMsg5(){ //事务发送
		Message message = new Message();
		message.setId(1);
		message.setMsg("我是小明,我来注册");
		message.setSendTime(new Date());
		logger.info("发送消息(事务发送) ----->>>>>  message = {}", gson.toJson(message));
		transactionalTemplate.send("registry", gson.toJson(message)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
			@Override
			public void onFailure(Throwable throwable) {
				logger.error("发生错误,消息发送失败!");
			}

			@Override
			public void onSuccess(SendResult<String, String> stringStringSendResult) {
				logger.info("消息发送成功!");
			}
		});
		//int i = 1/0;
		return "testSendMsg5";
	}

现在 我们把业务系统想的复杂一些,用户注册完成后还需要进行其他方面的操作,也就是需要向其他服务发出信息。此时若一个发生错误,其他都不在发送(发送错误的处理先不表,可以看下后续的分布式事务的处理。)

可以看到,添加事务后,发生错误后全都不在进行发送。

原文地址:https://www.cnblogs.com/luckyhui28/p/12518679.html