RocketMQ集群消息测试(二)

最近两天发生了很多事情,李咏(勇)去世,金庸讣告,重庆公交车坠江,印尼狮航JT610航班坠毁。让我感觉生命实在是脆弱,有人寿终正寝,有人患病离世,这些都是可预见的。但是像公交坠江,飞机坠海这类的就是不可预见的。他们也许正在去上班的路上,也许正在去参加朋友聚会的路上,也许家中有正等待他们的亲人,可是突然一次事故。将他们与亲人,朋友永远的分开了。所以活着的人除了为他们惋惜,还应该珍惜当下,珍惜每一天,每一刻。我们有理由,有时间去学习。

废话不多说了,上篇文章搭建了一个两主两从异步的RocketMQ集群环境,这篇文章主要对这个集群环境进行可用性,可靠性的测试。我们仍然暂时不去讨论源码实现。

场景一:验证Producer发送消息是负载均衡的。

测试代码:我使用send(msg)方法发送消息,没有自定义MessageQueueSelector,一共发送了12条消息。

 1 public class Producer {
 2     public static void main(String[] args) throws UnsupportedEncodingException {
 3         try {
 4             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 5             ((DefaultMQProducer) producer).setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");
 6             producer.start();
 7 
 8             for (int i = 0; i < 12; i++) {
 9                 Message msg = new Message("TopicTest", "TagA", "KEY" + i,
10                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
11                 SendResult sendResult = producer.send(msg);
12 
13                 System.out.println(sendResult);
14             }
15 
16             producer.shutdown();
17         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
18             e.printStackTrace();
19         }
20     }
21 }

我们看一下Broker情况,此时broker-a和broker-b各有6条消息,并且成功把消息同步到slave。

场景二:消费测试,代码如下

 1 public class Consumer {
 2 
 3     public static void main(String[] args) throws MQClientException {
 4         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
 5         consumer.setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");
 6         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 7         consumer.subscribe("TopicTest", "TagA");
 8 
 9         consumer.registerMessageListener(new MessageListenerOrderly() {
10             @Override
11             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
12                 context.setAutoCommit(false);
13                 System.out.println(Thread.currentThread().getName()+":"+msgs);
14                 return ConsumeOrderlyStatus.SUCCESS;
15             }
16         });
17         consumer.start();
18         System.out.printf("Consumer Started.%n");
19     }
20 }
Consumer采用Push方式,其实RocketMQ的所谓推模式本质上也是对Pull模式的封装,这里先不去讨论。

Broker初始状态:

如下图:

1、启动Consumer

2、启动Producer,发送12条消息,此时看到broker中Today Produce Count加了6,Today Consume Count也加了6。说明消息都被消费了。

 场景三:如果两个broker master都挂掉,consumer还会拉取消息吗?

从上图可以看到,即使两个broker master都宕机了,并不影响consumer继续从slave拉取消息。只是此时无法写写入消息了。

接下来我们将两台master服务启动,看看会发生什么

和预想的效果一样,消费端继续从master拉取消息,这里有一个关键的问题,就是明显消息被重复消费了,RocketMQ之所以允许重复消费是因为master挂掉的概率很小,如果在broker里做消息去重操作,将会影响整个Broker的吞吐量,所以重复消费问题肯定需要业务方自己解决了。

场景四:如果两个namesrv都挂了,对Producer和Consumer有什么影响呢?

这个就不贴图,图片也反映不出来真实情况,其实真实情况是这样的。我将两个namesrv服务停止之后,原来已经建立连接Producer和Consumer依然可以正常工作,只是新加入的Producer和Consumer无法工作了。

场景五:两个slave都挂掉了,会有什么现象?

如上图:首先将Producer停止,不再发送新的消息,然后启动broker-a slave服务。刚启动时produce count数量是0。

如下图:继续观察,发现produce count数量开始上升,说明slave启动之后,master开始将消息向slave同步。

重点:通过上面测试,大家可能注意到开源版本的RocketMQ,Broker的master和slave是不能自动切换的。一旦master挂了,这条线路也就不能写消息了。根据Producer轮询规则,所有的消息都会发送到另一条Broker线路上。但是其实阿里自己内部使用的RocketMQ版本是支持master和slave自动切换的。如果你们用阿里云服务,也能享受到这个待遇。

原文地址:https://www.cnblogs.com/shileibrave/p/9882511.html