分布式事务——幂等设计(rocketmq案例)

  幂等指的就是执行多次和执行一次的效果相同,主要是为了防止数据重复消费。MQ中为了保证消息的可靠性,生产者发送消息失败(例如网络超时)会触发 "重试机制",它不是生产者重试而是MQ自动触发的重试机制, 而这种情况下消费者就会收到两条消息,比如明明只需要扣一次款, 可是消费者却执行了2次。为了解决幂等问题,每一个消息应该有一个全局的唯一的标识,当处理过这条消息后,就把这个标识保存到数据库或者redis中,在处理消息前前判断这个标识记录为空就好了。像activemq中msgId就是唯一的,我们可以直接拿这个id来判断,但是rocketmq重试机制不一样,它重发会产生一个新的id,但是它提供了setKeys()这个api,我们可以给key设置一个唯一的流水编号来加以判断。(重试机制是不存在并发问题的,它是间隔一段时间自动促发的)。

1. 导入依赖( 生产者和消费者的依赖都一样)

    <parent> 
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
        <relativePath/> 
    </parent>
    <!-- springcloud
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR6</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
     -->
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- webmvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 集成lombok 框架(get/set) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- RocketMq -->
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <!-- 热加载 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.30</version>
        </dependency>
    </dependencies>
  
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build> 
pom.xml

2. 生产者配置参数和配置文件

#该应用是否启用生产者
#rocketmq.producer.isOnOff=on
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.producer.groupName=mqtest
#mq的nameserver地址
rocketmq.producer.namesrvAddr=192.168.5.7:9876
#消息最大长度 默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=3
rocketmq.properties
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
/**
 * 生产者配置
 */
@PropertySource("classpath:rocketmq.properties")
@Configuration
public class MQProducerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
    /**
     * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
     */
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    /** 服务器地址  */
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    /**
     * 消息最大大小,默认4M
     */
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    /**
     * 消息发送超时时间,默认3秒
     */
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    /**
     * 消息发送失败重试次数,默认2次
     */
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;

    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
        //producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        //如果发送消息失败,设置重试次数,默认为2次
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        try {
            producer.start();
            LOGGER.info(String.format("rocketmq producer start "));
        } catch (MQClientException e) {
            LOGGER.error(String.format("producer is error {}", e.getMessage(),e));
        }
        return producer;
    }
}
MQProducerConfiguration

3. 生产者发送消息

import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j;
@RestController
@Slf4j
public class TestController {

    /**使用RocketMq的生产者*/
    @Autowired
    private DefaultMQProducer defaultMQProducer;
    
    @RequestMapping("/send")
    public void send(){
        String msg = "幂等";
        log.info("开始发送消息:"+msg);
        
        try {
            // arg0主题名称    arg1分组    arg2内容
            Message sendMsg = new Message("DemoTopic","wulei",(msg).getBytes());
            // 注意: activemq的msgId是唯一的,但是rocketmq的不是,所以幂等不能用id来判断,我们可以通过setKeys来解决,一般都是业务id,这里用随机数代替。
            sendMsg.setKeys(UUID.randomUUID().toString());
            SendResult sendResult = defaultMQProducer.send(sendMsg);
            //默认3秒超时
            log.info("消息发送响应信息:"+sendResult.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
TestController

4. 消费者配置参数和配置文件

##该应用是否启用消费者
#rocketmq.consumer.isOnOff=on
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.consumer.groupName=mqtest
#mq的nameserver地址
rocketmq.consumer.namesrvAddr=192.168.5.7:9876
#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
rocketmq.consumer.topics=DemoTopic~*;
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize=1
rocketmq.properties
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.wulei.listener.MQConsumeMsgListenerProcessor;
/**
 * 消费者配置
 */
@PropertySource("classpath:rocketmq.properties")
@Configuration
public class MQConsumerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
    // 地址
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    // 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    // 该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags)
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    
    @Autowired
    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
    
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /**
         * 设置消费模型,集群还是广播,默认为集群
         */
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 设置一次消费消息的条数,默认为1条
         */
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            /**
             * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
             */
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
            LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
        }catch (MQClientException e){
            LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
        }
        return consumer;
    }
}
MQConsumerConfiguration

5. 消费者监听消息

import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
    private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
   
    // 假装这是一个redis
    private HashMap<String, String> myredis = new HashMap<String, String>();
    
    /**
     *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
     *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        if(CollectionUtils.isEmpty(msgs)){
//            logger.info("接受到的消息为空,不处理,直接返回成功");
//            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//        }
//        

        //for(MessageExt messageExt : msgs) { }
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();// 自定义的唯一key
        String msgId = null;                // 消息id(不是唯一的)
        String msgContext = null;            // 消息内容
        int reconsume = 0;                   // 重试次数
        
        
        if(messageExt.getTopic().equals("DemoTopic") && messageExt.getTags().equals("wulei")){
            if(myredis.get(keys)==null) { 
                //logger.info("接受到的消息为:"+messageExt.toString());
                  msgId = messageExt.getMsgId();
                  msgContext = new String(messageExt.getBody());
                  reconsume = messageExt.getReconsumeTimes();
                  try {
                      int i = 1/0;
                      System.out.println("消费成功: id:"+msgId+"  msg"+msgContext+"   次数"+reconsume);
                      myredis.put(messageExt.getKeys(), msgContext);
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 重试3次就不在重试了,直接返回消费成功状态,并触发人工补偿机制。
                    if(reconsume==2) {
                        myredis.put(messageExt.getKeys(), msgContext);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }else {
                        // 一般消费者这边尽量不要抛异常,它失败就会触发重试机制。如果非要抛异常可以在try{}catch{}里面return ConsumeConcurrentlyStatus.RECONSUME_LATER(表示失败让他重试)
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }else {
                // 已经消费过就不要再重试了,直接返回成功。
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }else {
            // 不存在不要再重试了,直接返回成功。
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}
MQConsumeMsgListenerProcessor
原文地址:https://www.cnblogs.com/wlwl/p/10155889.html