【SSM】整合RocketMQ

前言

     RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎于阿里内部使用的MetaQ,本文主要是写个小例子演示一下消息从生产到消费的过程。

RocketMQ下载和安装

    下载地址 http://rocketmq.apache.org/release_notes/release-notes-4.5.0/

     安装和配置 https://blog.csdn.net/u010391342/article/details/82150062

     注意一下,启动Broker的时候用此命令避免出现找不到topic的情况 mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

RocketMQ整合

文件改动如下

@Service
public class DeptServiceImpl implements DeptService {
    @Autowired
    DeptDOMapper deptDOMapper;

    @Autowired
    Producer producer;
    @Override
    public List<DeptInfo> listDeptInfo(String name) {      producer.sendMessage(String.valueOf(RandomUtils.nextLong(0,100000)),"查询部门列表,参数:"+name);
            List<DeptDO> deptDOList = deptDOMapper.listDept(name);
            List<DeptInfo> deptInfos = new ArrayList<>();
            deptDOList.stream().forEach(x -> {
                DeptInfo info = new DeptInfo();
                info.setName(x.getName());
                info.setId(x.getId());
                deptInfos.add(info);
            });

    }
}
DeptServiceImpl.java
@Component
public class Consumer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        String messageBody = new String(messageExt.getBody(), "utf-8");
                        System.out.println("消费响应:MsgId: " + messageExt.getMsgId() + ",内容: " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Consumer.java
@Component
public class Producer {

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private DefaultMQProducer producer;

    public boolean sendMessage(String key, String message) {
        try {
            Message ms = new Message();
            ms.setKeys(key);
            ms.setTags("push");
            ms.setTopic("PushTopic");
            ms.setBody(message.getBytes("UTF-8"));
            SendResult result = producer.send(ms);
            System.out.println("发送消息:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
    @PostConstruct
    public void defaultMQProducer() {
        //生产者的组名
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Producer.java
     <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.4.0</version>
        </dependency>
pom.xml
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer

# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer

# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
rabbitmq.properties

applicationContext.xml加入以下配置

 <!-- 加载配置文件 -->
    <context:property-placeholder location="classpath:rabbitmq.properties" ignore-unresolvable="true"/>

运行跑起来,在控制台会看到

发送消息:MsgId:1E66D026378000D8E96F32ACEE480000,发送状态:SEND_OK
消费响应:MsgId: 1E66D026378000D8E96F32ACEE480000,内容: 查询部门列表,参数:1
原文地址:https://www.cnblogs.com/caizl/p/10678670.html