rocketmq producer 高可用原理

rocketmq 分为 namesrv、broker、producer、consumer

broker分为MASTER/SLAVE,对producer而言,会发message到broker上,写只会发送到brokerId=0 主broker上

对consumer而言,是读,既可以是主broker,也可以是备broker

下面来分析一下producer发送message源码过程,以下以顺序消费为例

DefaultMQProducerImpl.sendSelectImpl

其中tryToFindTopicPublishInfo负责查找当前可用的broker

 1     private SendResult sendSelectImpl(
 2         Message msg,
 3         MessageQueueSelector selector,
 4         Object arg,
 5         final CommunicationMode communicationMode,
 6         final SendCallback sendCallback, final long timeout
 7     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 8         long beginStartTime = System.currentTimeMillis();
 9         this.makeSureStateOK();
10         Validators.checkMessage(msg, this.defaultMQProducer);
11 
12         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
13         if (topicPublishInfo != null && topicPublishInfo.ok()) {
14             MessageQueue mq = null;
15             try {
16                 mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
17             } catch (Throwable e) {
18                 throw new MQClientException("select message queue throwed exception.", e);
19             }
20 
21             long costTime = System.currentTimeMillis() - beginStartTime;
22             if (timeout < costTime) {
23                 throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
24             }
25             if (mq != null) {
26                 return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
27             } else {
28                 throw new MQClientException("select message queue return null.", null);
29             }
30         }
31 
32         throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
33     }

在tryToFindTopicPublishInfo中,调用

1> this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 当第一次调用时,NameServer中没有该topic、通过NameServer查topic时是查不到的
2> this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 调用该方法
 
 1     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       //第一次进入时,topicPublishInfoTable中没有该topic的信息
2 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 3 if (null == topicPublishInfo || !topicPublishInfo.ok()) { 4 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); 5 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 6 topicPublishInfo = this.topicPublishInfoTable.get(topic); 7 } 8 9 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 10 return topicPublishInfo; 11 } else { 12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 13 topicPublishInfo = this.topicPublishInfoTable.get(topic); 14 return topicPublishInfo; 15 } 16 }

MQClientInstance.updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)

这个方法中 isDefault= true, defaultMQProducer!=null时

 1     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
 2         DefaultMQProducer defaultMQProducer) {
 3         try {
 4             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
 5                 try {
 6                     TopicRouteData topicRouteData;
 7                     if (isDefault && defaultMQProducer != null) {//当isDefault=true, defaultMQProducer时会走这个分支
 8                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
 9                             1000 * 3);
10                         if (topicRouteData != null) {
11                             for (QueueData data : topicRouteData.getQueueDatas()) {
12                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
13                                 data.setReadQueueNums(queueNums);
14                                 data.setWriteQueueNums(queueNums);
15                             }
16                         }
17                     } else {
18                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
19                     }
20                     if (topicRouteData != null) {
21                         TopicRouteData old = this.topicRouteTable.get(topic);
22                         boolean changed = topicRouteDataIsChange(old, topicRouteData);
23                         if (!changed) {
24                             changed = this.isNeedUpdateTopicRouteInfo(topic);
25                         } else {
26                             log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
27                         }
28 
29                         if (changed) {
30                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
31 
32                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
33                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
34                             }
35 
36                             // Update Pub info
37                             {    //在该方法中,过滤到master broker
38                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
39                                 publishInfo.setHaveTopicRouterInfo(true);
40                                 Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
41                                 while (it.hasNext()) {
42                                     Entry<String, MQProducerInner> entry = it.next();
43                                     MQProducerInner impl = entry.getValue();
44                                     if (impl != null) {//update topicPublishInfoTable
45                                         impl.updateTopicPublishInfo(topic, publishInfo);
46                                     }
47                                 }
48                             }
49 
50                             // Update sub info
51                             {    //拿到readealbe queue
52                                 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
53                                 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
54                                 while (it.hasNext()) {
55                                     Entry<String, MQConsumerInner> entry = it.next();
56                                     MQConsumerInner impl = entry.getValue();
57                                     if (impl != null) {
58                                         impl.updateTopicSubscribeInfo(topic, subscribeInfo);
59                                     }
60                                 }
61                             }
62                             log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
63                             this.topicRouteTable.put(topic, cloneTopicRouteData);
64                             return true;
65                         }
66                     } else {
67                         log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
68                     }
69                 } catch (Exception e) {
70                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
71                         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
72                     }
73                 } finally {
74                     this.lockNamesrv.unlock();
75                 }
76             } else {
77                 log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
78             }
79         } catch (InterruptedException e) {
80             log.warn("updateTopicRouteInfoFromNameServer Exception", e);
81         }
82 
83         return false;
84     }

DefaultMQProducerImpl.updateTopicPublishInfo(String, TopicPublishInfo)

该方法中会 更新 DefaultMQProducerImpl.topicPublishInfoTable

1 @Override
2     public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
3         if (info != null && topic != null) {
4             TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
5             if (prev != null) {
6                 log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
7             }
8         }
9     }

另外,如果某一个broker异常,还有定时任务检查

MQClientInstance.startScheduledTask()

 1     private void startScheduledTask() {
 2         if (null == this.clientConfig.getNamesrvAddr()) {
 3             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 4 
 5                 @Override
 6                 public void run() {
 7                     try {
 8                         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
 9                     } catch (Exception e) {
10                         log.error("ScheduledTask fetchNameServerAddr exception", e);
11                     }
12                 }
13             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
14         }
15 
16         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17 
18             @Override
19             public void run() {
20                 try {
21                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
22                 } catch (Exception e) {
23                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
24                 }
25             }
26         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
27 
28         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
29 
30             @Override
31             public void run() {
32                 try {
33                     MQClientInstance.this.cleanOfflineBroker();
34                     MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
35                 } catch (Exception e) {
36                     log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
37                 }
38             }
39         }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
40 
41         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
42 
43             @Override
44             public void run() {
45                 try {
46                     MQClientInstance.this.persistAllConsumerOffset();
47                 } catch (Exception e) {
48                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
49                 }
50             }
51         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
52 
53         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
54 
55             @Override
56             public void run() {
57                 try {
58                     MQClientInstance.this.adjustThreadPool();
59                 } catch (Exception e) {
60                     log.error("ScheduledTask adjustThreadPool exception", e);
61                 }
62             }
63         }, 1, 1, TimeUnit.MINUTES);
64     }

调用链

  1. MQClientInstance.updateTopicRouteInfoFromNameServer()

        2.MQClientInstance.updateTopicRouteInfoFromNameServer(String)

        3. MQClientInstance.updateTopicRouteInfoFromNameServer(String, boolean, DefaultMQProducer)

这样就可以最大限度的保证获取到最新的broker

即使这样,还是有可能发送失败,

  1、因为当定时更新任务30秒内执行完,这时broker也有可能出现问题。会发送失败。

  2、当失败重试时,MessageQueueSelector.select(List<MessageQueue>, Message, Object)中List<MessageQueue>

list个数会发生变化,对顺序发送来说,会发送到不同的queue中

原文地址:https://www.cnblogs.com/toUpdating/p/9965003.html