一、Group、Topic、Tag之间的关系
1.Group
代表具有同一类属性的消费者/生产者组.
一个消费者Group ID代表一个Consumer实例群组,对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例.
eg:三台服务器都用于消费用户信息修改的消息,它们可以同属于一个消费者组.
订阅关系一致
同一个消费者Group ID下所有的实例需在以下两方面均保持一致:
- 订阅的Topic必须一致
- 订阅的Topic中的Tag必须一致
2.Topic
Topic是生产者在发送消息和消费者在拉取消息的类别.Topic与生产者和消费者之间的关系非常松散.具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息.类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可.
eg:消息话题可以标识同一组下的各种场景,如都是密码修改的消息.
3.Tag
标签,换句话的意思就是子主题,为用户提供了额外的灵活性.有了标签,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记.标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能.
eg:Topic下的分类,如都是密码修改类型的消息,修改成功的Tag,修改失败的Tag...
二、消息发送与接收实例
1.生产者
1.1 同步Synchronously
package cn.yang37.mq.rocketmq.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @Descr,tion: 同步发送消息
* @Class: SyncProducer 可靠的同步传输广泛应用于重要场景,如:重要信息通知,短信通知,短信营销系统等.
* @Author: Yiang37
* @Date: 2021/3/26 10:46
* @Version: 1.0
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("group_name1");
producer.setNamesrvAddr("ip:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
run
SendResult [sendStatus=SEND_OK, msgId=0A1059111C04CB38711E84245F400000, offsetMsgId=711F68B300002A9F000000000004BCF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=384]
SendResult [sendStatus=SEND_OK, msgId=0A1059111C04CB38711E84245F790001, offsetMsgId=711F68B300002A9F000000000004BDC1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=381]
SendResult [sendStatus=SEND_OK, msgId=0A1059111C04CB38711E84245F960002, offsetMsgId=711F68B300002A9F000000000004BE8A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=383]
1.2 异步Asynchronously
package cn.yang37.mq.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Descr,tion: 异步发送消息
* @Class: AsyncProducer 异步传输通常用于响应时间敏感的业务场景.
* @Author: Yiang37
* @Date: 2021/3/26 14:38
* @Version: 1.0
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group_name2");
producer.setNamesrvAddr("ip:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 3;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
run
1 OK 0A1059114F40D479E6838424CE6E0002
0 OK 0A1059114F40D479E6838424CE6E0001
2 OK 0A1059114F40D479E6838424CE6E0000
1.3 单向传输One-way Mode
package cn.yang37.mq.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @Descr,tion: 单向传输
* @Class: OnewayProducer 单向传输用于需要中等可靠性的情况,例如日志收集.
* @Author: Yiang37
* @Date: 2021/3/26 14:39
* @Version: 1.0
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception {
//设置生产者的组名
DefaultMQProducer producer = new DefaultMQProducer("group_name3");
//Namesrv地址
producer.setNamesrvAddr("ip:9876");
//启动生产者
producer.start();
for (int i = 0; i < 3; i++) {
//构建一个Message实例
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送消息
producer.sendOneway(msg);
}
Thread.sleep(5000);
//关闭生产者
producer.shutdown();
}
}
run
2.消费者
package cn.yang37.mq.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Descr,tion:
* @Class: Consumer
* @Author: Yiang37
* @Date: 2021/3/26 10:50
* @Version: 1.0
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name0");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("ip:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("---------------------------------
" + Thread.currentThread().getName() + " 接收到新消息:
" + msgs);
for (Message msg : msgs) {
System.out.println("消息内容:
" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}