RocketMQ 学习笔记

为什么选择 RocketMQ ?

比较一下其他的消息中间件:

最关键的是,RocketMQJava 写的。。。易于学习

安装 RocketMQ

  1. 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.7.0

  2. 配置环境变量

  3. 启动 NAMESERVER

> mqnamesrv
  1. 启动 BROCKER
> mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
  1. 安装可视化插件(可有可无的)
    下载
    Git地址:https://github.com/apache/rocketmq-externals
    gitee地址:https://gitee.com/mirrors/RocketMQ-Externals
    配置 application.properties 端口
> server.port=9876

Maven 编译运行 jar
进文件夹下

> mvn clean package -Dmaven.test.skip=true
> java -jar *.jar

引入 pom 依赖

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

MQService

import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RocketMQService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     *  发送消息
     * @param topic
     * @param body
     * @throws Exception
     */
    public void sendMessage(String topic,String body) throws Exception{
        Message message = new Message(topic,body.getBytes());
        rocketMQTemplate.getProducer().send(message);
    }

    /**
     * 发送延时消息
     * @param topic
     * @param body
     * @param delayTimeLevel
     * @throws Exception
     */
    public void sendDelayMessage(String topic, String body, int delayTimeLevel) throws Exception {
        Message message = new Message(topic, body.getBytes());
        message.setDelayTimeLevel(delayTimeLevel);
        rocketMQTemplate.getProducer().send(message);
    }
}

测试监听

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;

@Component
@RocketMQMessageListener(topic = "test-mq", consumerGroup = "conmuserGrop-mq")
public class ConsumerListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            String body = new String(messageExt.getBody(), "UTF-8");
            System.out.println("receive message:" + body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

测试类

import com.jiuzhang.seckill.mq.RocketMQService;
import com.jiuzhang.seckill.service.SeckillActivityService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;

@SpringBootTest
public class MQTest {
    @Autowired
    RocketMQService rocketMQService;
    @Autowired
    SeckillActivityService seckillActivityService;
    @Test
    public void sendMQTest() throws Exception {
        rocketMQService.sendMessage("test-mq", "Hello RocketMQ!" + new Date().toString());
    }
}
作  者:月 暮
出  处:https://www.cnblogs.com/AardWolf/
特此声明:欢迎园子的大大们指正错误,共同进步。如有问题或建议,也请各位大佬多多赐教!如果您觉得文章对您有帮助,可以点击文章右下角【推荐】一下。
版权声明:本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。
原文地址:https://www.cnblogs.com/AardWolf/p/15219998.html