Spring整合RocketMQ

本地部署:

window配置启动:

  1、 添加环境变量 

    ROCKETMQ_HOME="D: ocketmq"
    NAMESRV_ADDR="localhost:9876" 

  2、启动名称服务器

    binmqnamesrv.cmd

  3、启动Broker

    binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

 可视化控制台:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

  mvn打包: mvn clean package -Dmaven.test.skip=true 

  启动:java -jar rocketmq-console-ng-2.0.0.jar

  访问:localhost:8080

Spring整合RocketMQ

https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

1、依赖

        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>    

2、配置生产者和消费者

@Configuration
public class RocketmqConfig {

    /**
     * 实例化消息生产者Producer
     * start()方法用于启动Producer实例
     * shutdown()方法关闭Producer实例
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public MQProducer mqProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("custom-rocketmq-producer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        // producer.start();
        return producer;
    }

    /**
     * 消费者配置
     *
     * @return
     * @throws MQClientException
     */
    @Bean
    public MQPushConsumer mqPushConsumer() throws MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("custom-rocketmq-consumer");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("custom-topic1", "*");
        // 注册监听器来处理消息,MyConsumer:自定义消息监听器
        consumer.registerMessageListener(new MyConsumer());
        // 启动消费者实例
        consumer.start();
        return consumer;
    }

}

3、生产消息示例

    @Autowired
    private MQProducer mqProducer;

    @Test
    void producerTest() throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        // 同步发送消息
        Message syncMsg = new Message("custom-topic1", "sync_msg", "hello RocketMQ syncMsg to custom-topic1".getBytes());
        mqProducer.send(syncMsg);

        // 异步发送消息
        Message asyncMsg = new Message("custom-topic1", "async_msg", "hello RocketMQ asyncMsg to custom-topic1".getBytes());
        mqProducer.send(asyncMsg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // do somethind
            }

            @Override
            public void onException(Throwable e) {
                // do somethind
            }
        });

        // 发送延迟消息
        Message delayMsg = new Message("custom-topic1", "delay_msg", ("Hello RocketMQ delayMsg to custom-topic1").getBytes());
        // 设置延时等级5,这个消息将在1分钟之后收到(现在只支持固定的几个时间,详看delayTimeLevel)
        delayMsg.setDelayTimeLevel(5);
        // 发送消息
        mqProducer.send(delayMsg);
    }

4、消费消息

/**
 * 自定义消息监听器
 */
public class MyConsumer implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : messageExts) {
            System.out.println("Receive New Messages:" + new String(messageExt.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

 

SpringBoot整合RocketMQ

https://github.com/apache/rocketmq-spring/wiki

要求:

  JDK1.8及以上

  Maven 3.0及以上 

  SpringBoot 2.0及以上

1、添加依赖(rocketmq-client 版本为4.8.0)

        <!--add dependency in pom.xml-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

2、配置namesrv 和 生产组 

## application.properties
#支持配置多个nameserver地址,采用;分隔即可
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

3、发送消息

@Component
public class TemplateProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void producer() {
        // 同步发送消息
        rocketMQTemplate.convertAndSend("test", "Hello, World sync!");
        // 异步发送消息
        rocketMQTemplate.asyncSend("test", "Hello, World async!", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("async onSucess SendResult=%s %n", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.printf("async onException Throwable=%s %n", e);
            }

        });

        // 发送消息时指定TAG,通过destination参数指定,格式为topicName:tagName
        rocketMQTemplate.convertAndSend("test:tag1", "hello,msg of tag1");

        //send spring message,Message参数包为: org.springframework.messaging.Message
        rocketMQTemplate.send("test", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());

        // 发送消息时如何设置消息的key,可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msg的headers来完成
        String msgId = UUID.randomUUID().toString();
        rocketMQTemplate.send("topic-test", MessageBuilder.withPayload("msg of key").setHeader(MessageConst.PROPERTY_KEYS, msgId).build());

        // 发送顺序排序消息
        rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey");

        // 同步发送延迟消息,延迟levle为9,即5分钟。
        rocketMQTemplate.syncSend("delay-topic", MessageBuilder.withPayload("hello delay msg").build(), 1000, 9);

        // 销毁rocketMQTemplate,注意:一旦销毁,就不能再使用rocketMQTemplate发送消息
        // 不需要手动执行此方法,rocketMQTemplate会在spring容器销毁时自动销毁
        //rocketMQTemplate.destroy();
    }
}

4、消费消息

  1)Push模式

/**
 * push模式消费消息
 * RocketMQListener<String> 只获取消息payload
 * RocketMQListener<MessageExt> 获取rocketmq原生的MessageExt消息
 */
@Component
@RocketMQMessageListener(topic = "custom-topic1", consumerGroup = "my-consumer_delay-test-1")
public class PushConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(("received message: " + new String(messageExt.getBody())));
    }
}

  2)Pull模式

  从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

  ①:application.properties 添加配置

rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test

  ②:代码中主动拉取消息示例

/**
 * pull模式消费消息
 */
@Component
public class PullConsumer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void pullMessage() {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
    }
}

END.

原文地址:https://www.cnblogs.com/yangyongjie/p/14410414.html