RocketMq(五、事务消息)

生产者对ID为12的用户进行修改操作,年龄增加一岁并发送给MQ,保证本地事务和消息能正确发送到MQ。

事务原理

https://www.cnblogs.com/huangying2124/p/11702761.html 

可以看看这位大牛写的博客,这里就不具体介绍原理,只贴代码。

test类

package com.wk.test.rocketmqTest.transaction;

import com.wk.entity.User;
import com.wk.service.UserService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQTest {

    @Autowired
    private UserService userService;
    @Autowired
    private Producer producer;

    @Test
    public void ss() throws MQClientException {
        User user = userService.findUserById(12);
        producer.exeutorTransaction(user);
    }
}

生产者

package com.wk.test.rocketmqTest.transaction;

import com.alibaba.fastjson.JSONObject;
import com.wk.entity.User;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

@Component
public class Producer {

    @Autowired
    private TransactionListenerImpl transactionListener;

    public void exeutorTransaction(User user){

        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
        producer.setNamesrvAddr("10.32.16.179:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("transaction_thread");
                return thread;
            }
        });
        producer.setSendMsgTimeout(10000);
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        try {
            producer.start();
            user.setDesc("年龄加一");
            Message message = new Message("TransactionTopic","transaction","key", (JSONObject.toJSONString(user)).getBytes());
            Map<String, Object> map = new HashMap<>();
            map.put("1", "测试参数1");
            map.put("2","测试参数2");
            SendResult sendResult = producer.sendMessageInTransaction(message,map);
            System.out.println(sendResult);
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }
}

事务监听

package com.wk.test.rocketmqTest.transaction;

import com.alibaba.fastjson.JSONObject;
import com.wk.entity.User;
import com.wk.service.UserService;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Autowired
    private UserService userService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            String body = new String(message.getBody(),"UTF-8");
            User user = JSONObject.parseObject(body,User.class);
            Map<String, Object> map = (Map<String, Object>) o;
            System.out.println(map);
            userService.updateUser(user);
//            int value = transactionIndex.getAndIncrement();
//            int status = value % 3;
//            localTrans.put(message.getTransactionId(), status);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            e.printStackTrace();
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//        Integer status = localTrans.get(messageExt.getMsgId());
//        if(null != status){
//            switch (status){
//                case 0:
//                    return LocalTransactionState.UNKNOW;
//                case 1:
//                    return LocalTransactionState.COMMIT_MESSAGE;
//                case 2:
//                    return LocalTransactionState.ROLLBACK_MESSAGE;
//            }
//        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

git源码地址

https://github.com/wangkang2/springboot

原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12447540.html