RocketMQ之简单使用

在之前我们是使用 RocketMQ 自带的程序来验证功能,今天我们自己实现下消息的生产和消费。

一、简单使用

1.1 引入依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <!--和安装的 MQ 版本一致-->
    <version>4.7.0</version>
</dependency>

1.2 新建配置类:

public class RocketMQConfig {
    // 服务器地址
    public static final String NAME_SERVER = "192.168.137.47:9876";
}

1.3 新建消费者:

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 创建生产者对象,指明了生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("simple");
        // 设置服务器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 启动实例
        producer.start();

        for (int i = 0; i < 3; i++) {
            String str = "Hello RocketMQ";
            // 实例化消息对象
            Message message = new Message("topicTest", "tagA", (str + i).getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }
        // 关闭生产者
        producer.shutdown();
    }

}

1.4 新建消费者:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者对象,指明了消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple");
        // 设置服务器地址
        consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 订阅指定主题
        consumer.subscribe("topicTest","*");
        // 注册消息监听事件
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                System.out.println("msg:"+msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }

}

1.5 运行生产者类,查看控制台输出:

SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF61B0000, offsetMsgId=C0A8892F00002A9F00000000000283C3, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF6260001, offsetMsgId=C0A8892F00002A9F0000000000028474, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=2], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A79FA0D6E9418B4AAC269CDF6290002, offsetMsgId=C0A8892F00002A9F0000000000028525, messageQueue=MessageQueue [topic=topicTest, brokerName=localhost, queueId=3], queueOffset=9]

1.6 运行消费者,查看控制台输出:

msg:[MessageExt [brokerName=localhost, queueId=1, storeSize=177, queueOffset=6, sysFlag=0, bornTimestamp=1584767105564, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105616, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F00000000000283C3, commitLogOffset=164803, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1584767105572, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF61B0000, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
msg:[MessageExt [brokerName=localhost, queueId=2, storeSize=177, queueOffset=8, sysFlag=0, bornTimestamp=1584767105574, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105622, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F0000000000028474, commitLogOffset=164980, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1584767105577, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF6260001, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
msg:[MessageExt [brokerName=localhost, queueId=3, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1584767105577, bornHost=/192.168.137.1:41420, storeTimestamp=1584767105625, storeHost=/192.168.137.47:10911, msgId=C0A8892F00002A9F0000000000028525, commitLogOffset=165157, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1584767105585, UNIQ_KEY=0A79FA0D6E9418B4AAC269CDF6290002, WAIT=true, TAGS=tagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
原文地址:https://www.cnblogs.com/markLogZhu/p/12539602.html