Rocketmq<二>springboot集成rocktmq

 前提:对于sringboot来说 集成任何框架,无非就是三个步骤:1、添加pom依赖 , 2、修改配置文件 , 3、启动类添加注解和配置 。

一、pom依赖、配置文件

pom依赖:

 <!-- SpringBoot集成RocketMQ  https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

配置文件:

server:
  port: 1004
spring:
  application:
    name: lcn-user

eureka: client: service
-url: defaultZone: http://localhost:7900/eureka/
rocketmq: name
-server: 192.168.10.17:9876 producer: group: ${spring.application.name} send-message-timeout: 3000 retry-times-when-send-failed: 3 mq: user: topic: tpk02 group: name: lcn-pay tag: tag02 key: key02

二、消息发送、消费消息

1、发送同步消息:

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName UserService.java
 * @Description 消息发送
 * @createTime 2021-06-22 13:48:00
 */
@Service
@Slf4j
public class UserService {

    @Value("${mq.user.topic}")
    private String topic;
    @Value("${mq.user.tag}")
    private String tag;
    @Value("${mq.user.key}")
    private String key;
    @Value("${mq.user.group.name}")
    private String group;


    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息
     * @return
     */
    public String sendMsg01(){
        threadPoolTaskExecutor.submit(new Runnable() {
            @Override
            public void run() {
                Message message = MessageBuilder.withPayload("发送同步消息").build();
                String destination = String.format("%s:%s",topic ,tag);
                SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
                if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
                    log.info("消息发送成功【SendStatus】 :{}" ,sendResult);
                }else {
                    log.info("消息发送失败【SendStatus】 :{}" ,sendResult);
                }
            }
        });
        return "ok";
    }

    /**
     * 发送异步消息
     * @return
     */
    public String sendMsg02(){
        threadPoolTaskExecutor.submit(new Runnable() {
            @Override
            public void run() {
                Message message = MessageBuilder.withPayload("发送异步消息").build();
                String destination = String.format("%s:%s",topic ,tag);
                rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
                    /**
                     * 发送成功回调函数
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        log.info("消息发送成功   【发送结果】 :{}" ,sendResult);
                    }
                    /**
                     * 发送失败回调函数
                     * @param e
                     */
                    @Override
                    public void onException(Throwable e) {
                        log.info("异常:{} ,消息:{}" ,e ,message);
                        //todo 可以对异常消息进行其他操作,重新发送或者存入db
                    }
                });
            }
        });
        return "ok";
    }

    /**
     * 发送单项消息
     * @return
     */
    public String sendOneWayMsg(){
        log.info("发送单项消息------------------------------------");
        Message<String> message = MessageBuilder.withPayload("发送单项消息").build();
        String destination = String.format("%s:%s", topic, tag);
        rocketMQTemplate.sendOneWay(destination , message);
        return "ok";
    }


    /**
     * 发送顺序消息
     * @return
     */
    public String sendOrderlyMsg(){
        log.info("发送顺序消息------------------------------------");
        for (int i = 0; i < 100; i++) {
            int defaultTopicQueueNums = rocketMQTemplate.getProducer().getDefaultTopicQueueNums();
            Message<String> message = MessageBuilder.withPayload("发送顺序消息" + i + "   【队列数取模】:" + String.valueOf(i%defaultTopicQueueNums)).build();
            String destination = String.format("%s:%s", topic, tag);
            //根据i对topic中queue的数量取模后的值   放入对应的队列中  只能保证对应
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, String.valueOf(i%defaultTopicQueueNums));
            if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
                log.info("发送顺序消息成功 【消息内容】 :{}" ,sendResult);
            }else {
                log.info("发送顺序消息失败--------------------");
            }
        }
        return "ok";
    }


    /**
     * 发送事务消息
     * @param msgStr
     * @return
     */
    public String sendTransactionMessage(String msgStr){
        log.info("【发送消息】-------------");
        Future<TransactionSendResult> submit = threadPoolTaskExecutor.submit(new Callable<TransactionSendResult>() {
            @Override
            public TransactionSendResult call() {
                Message message = MessageBuilder.withPayload(msgStr).build();
                TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(group, topic, message, tag);
                log.info("【发送状态】:{}", result.getLocalTransactionState());
                return result;
            }
        });
        LocalTransactionState localTransactionState = null;
        try {
            localTransactionState = submit.get().getLocalTransactionState();
        } catch (Exception e) {
            log.error("获取消息发送状态失败------------------");
        }
        return localTransactionState.toString();
    }

    /**
     *
     * @return
     */
    public String updateUserInfo(){
        log.info("操作用户信息------------------------------------");
        try {
//            int count = 1/0;
        }catch (Exception e) {
            log.info("操作用户信息失败---------------");
            return "fail";
        }
        return "ok";
    }
}

2、事务消息(本地事务执行、消息回查):

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName SyncProducerListener.java
 * @Description 事务消息
 * @createTime 2021-06-22 14:51:00
 */
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "lcn-user01")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
    private ConcurrentHashMap<Integer, RocketMQLocalTransactionState> map=new ConcurrentHashMap<>();


    @Autowired
    private UserService userService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        map.put(message.hashCode(),RocketMQLocalTransactionState.UNKNOWN);

        String result = userService.updateUserInfo();

        if(!result.equalsIgnoreCase("OK")){
            System.out.println("本地事务出错,回滚事务消息--------");
            map.put(message.hashCode(),RocketMQLocalTransactionState.ROLLBACK);
        }else {
            map.put(message.hashCode(),RocketMQLocalTransactionState.COMMIT);
        }
        log.info(map.get(message.hashCode()).toString());
        return map.get(message.hashCode());
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("没有获得消息ack  -----  进行消息回查   消息的Tag:" + message);
        return map.get(message.hashCode());
    }
}

3、消息消费:

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName SyncProducerListener.java
 * @Description 消息接收者
 * @createTime 2021-06-22 22:51:00
 */

@Component
@RocketMQMessageListener(topic = "${mq.user.topic}",consumerGroup = "${mq.user.group.name}", selectorExpression = "*" ,
        //消费模式:广播
        messageModel = MessageModel.BROADCASTING,
        //消费模式:集群消费模式(默认情况)
//        messageModel = MessageModel.CLUSTERING,


        //同时接收异步传递的消息
        consumeMode = ConsumeMode.CONCURRENTLY
        //有序接收异步传递的消息
//         consumeMode = ConsumeMode.ORDERLY
)
public class PaymentListener implements RocketMQListener<MessageExt> {
    private static final Logger log = LoggerFactory.getLogger(PaymentListener.class);

    @Override
    public void onMessage(MessageExt messageExt) {

        log.info("开始接收到消息---------------------------------------");
        //1.解析消息内容
        try {
            String body = new String(messageExt.getBody(),"UTF-8");
//            User user = JSON.parseObject(body, User.class);
//            log.info("消息内容-------------:" + user);
            log.info("消息内容-------------:" + body);
        } catch (UnsupportedEncodingException e) {
            log.error("接收到消息失败 ,{}" ,e);
        }
        log.info("消息消费完成-----------------------------------------");
    }
}
原文地址:https://www.cnblogs.com/dongl961230/p/14923364.html