RocketMQ源码分析之Producer发送消息(五)

本节主要讲Producer同步发送消息的流程,异步的暂时不打算讲。因为还要讲Broker接受消息,存储消息,以及RemotingCommand。

老规矩电路图送上,为什么我喜欢电路图,因为UML比时序图或其他任何图都更能反映出类之间的调用。

发送消息时序图:下图带有浅绿色背景区域的部分为循环调用。retry send,你们懂得。

从时序图中可以看到整个发送消息都是围绕DefaultMQProducerImpl类展开的。

1、DefaultMQProducerImpl.sendDefaultImpl()方法,这里只贴出来主要代码

 1 private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode,
 2         final SendCallback sendCallback, final long timeout ){
 3     this.makeSureStateOK();//判断服务是否是Running状态,不是抛异常出来
 4     Validators.checkMessage(msg, this.defaultMQProducer);//检查消息正确性,包括topic,消息大小等
 5 
 6     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获取topic队列配置
 7     for (; times < timesTotal; times++) {
 8         MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择消息队列,后面详细说
 9         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
10     }
11 }

 2、DefaultMQProducerImpl.makeSureStateOK()方法

 这个方法就是检查服务状态是不是running,很多地方都会调用这个方法。

1 private void makeSureStateOK() throws MQClientException {
2     if (this.serviceState != ServiceState.RUNNING) {
3         throw new MQClientException("The producer service state not OK, "
4                 + this.serviceState
5                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
6     }
7 }

3、DefaultMQProducerImpl..tryToFindTopicPublishInfo()方法

获取Topic的队列配置
该方法主要是获取主题的队列信息,当从本地内存中获取不到时,从NameServer服务器获取主题配置信息。

 1 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 2     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
 3     if (null == topicPublishInfo || !topicPublishInfo.ok()) {
 4         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
   //当从本地内存中获取不到时,从NameServer服务器获取主题的队列配置信息
5 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 6 topicPublishInfo = this.topicPublishInfoTable.get(topic); 7 } 8 9 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 10 return topicPublishInfo; 11 } else { 12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 13 topicPublishInfo = this.topicPublishInfoTable.get(topic); 14 return topicPublishInfo; 15 } 16 }

 4、MQFaultStrategy.selectOneMessageQueue()方法

RcoketMQ中一个topic对应多个队列,该方法从topic的队列列表中选中一个Queue返回,选择队列的算法为轮询。

 1 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 2     if (this.sendLatencyFaultEnable) {
 3         try {
 4             int index = tpInfo.getSendWhichQueue().getAndIncrement();//记录选择累计次数
 5             for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 6                 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();//总次数对队列大小取余
 7                 if (pos < 0)
 8                     pos = 0;
 9                 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//根据余数取出一个队列返回,这样就保证了发送端几乎是均衡把消息发送到各个队列里面。
10                 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
11                     if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
12                         return mq;
13                 }
14             }
15 
16             final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
17             int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
18             if (writeQueueNums > 0) {
19                 final MessageQueue mq = tpInfo.selectOneMessageQueue();
20                 if (notBestBroker != null) {
21                     mq.setBrokerName(notBestBroker);
22                     mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
23                 }
24                 return mq;
25             } else {
26                 latencyFaultTolerance.remove(notBestBroker);
27             }
28         } catch (Exception e) {
29             log.error("Error occurred when selecting message queue", e);
30         }
31 
32         return tpInfo.selectOneMessageQueue();
33     }
34 
35     return tpInfo.selectOneMessageQueue(lastBrokerName);
36 }

 getSendWhichQueue()说一下,看下代码:

这里用到了volatile关键字和ThreadLocal类,volatile保证了sendWhichQueue一旦变化,Group下其他Producer马上就能看到最新数据。而之所以用ThreadLocal<Integer>不用AtomicInteger类。因为这里只需要定义一个线程内的局部变量,只供本线程使用。而AtomicInteger主要是为了解决兵法问题。

1 public class TopicPublishInfo {
2     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
3 }
4 
5 public class ThreadLocalIndex {
6     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
7 }

5、DefaultMQProducerImpl.sendKernelImpl()方法,只贴出了主要代码,具体代码大家还是去看源码吧。

sendKernelImpl方法主要作用就是封包,发送消息给Broker。
 1 private SendResult sendKernelImpl(final Message msg,
 2                                   final MessageQueue mq,
 3                                   final CommunicationMode communicationMode,
 4                                   final SendCallback sendCallback,
 5                                   final TopicPublishInfo topicPublishInfo,
 6                                   final long timeout) {
 7     //获取broker地址
 8     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
 9     //给消息设置唯一ID
10     MessageClientIDSetter.setUniqID(msg);
11     //当消息体大于4M时,进行压缩,注意只压缩消息体
12     UtilAll.compress(body, zipCompressLevel);
13     //使用是否VIPChannel通道发送数据,vip channel的端口号为普通端口号-2。
14     brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
15     //发送消息校验
16     CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
this.executeCheckForbiddenHook(checkForbiddenContext);
    
17     //发送消息前逻辑
18     SendMessageContext context = new SendMessageContext();
this.executeSendMessageHookBefore(context);
19     //构建发送消息请求
20     SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
21     //底层调用netty发送消息给broker。
22     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage();
23     return sendResult;
24 }
 
原文地址:https://www.cnblogs.com/shileibrave/p/9890369.html