本节主要讲Producer同步发送消息的流程,异步的暂时不打算讲。因为还要讲Broker接受消息,存储消息,以及RemotingCommand。
老规矩电路图送上,为什么我喜欢电路图,因为UML比时序图或其他任何图都更能反映出类之间的调用。
发送消息时序图:下图带有浅绿色背景区域的部分为循环调用。retry send,你们懂得。
从时序图中可以看到整个发送消息都是围绕DefaultMQProducerImpl类展开的。
1、DefaultMQProducerImpl.sendDefaultImpl()方法,这里只贴出来主要代码
1 private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, 2 final SendCallback sendCallback, final long timeout ){ 3 this.makeSureStateOK();//判断服务是否是Running状态,不是抛异常出来 4 Validators.checkMessage(msg, this.defaultMQProducer);//检查消息正确性,包括topic,消息大小等 5 6 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获取topic队列配置 7 for (; times < timesTotal; times++) { 8 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择消息队列,后面详细说 9 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); 10 } 11 }
2、DefaultMQProducerImpl.makeSureStateOK()方法
这个方法就是检查服务状态是不是running,很多地方都会调用这个方法。
1 private void makeSureStateOK() throws MQClientException { 2 if (this.serviceState != ServiceState.RUNNING) { 3 throw new MQClientException("The producer service state not OK, " 4 + this.serviceState 5 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); 6 } 7 }
3、DefaultMQProducerImpl..tryToFindTopicPublishInfo()方法
获取Topic的队列配置
该方法主要是获取主题的队列信息,当从本地内存中获取不到时,从NameServer服务器获取主题配置信息。
1 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { 2 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 3 if (null == topicPublishInfo || !topicPublishInfo.ok()) { 4 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//当从本地内存中获取不到时,从NameServer服务器获取主题的队列配置信息 5 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 6 topicPublishInfo = this.topicPublishInfoTable.get(topic); 7 } 8 9 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 10 return topicPublishInfo; 11 } else { 12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 13 topicPublishInfo = this.topicPublishInfoTable.get(topic); 14 return topicPublishInfo; 15 } 16 }
4、MQFaultStrategy.selectOneMessageQueue()方法
RcoketMQ中一个topic对应多个队列,该方法从topic的队列列表中选中一个Queue返回,选择队列的算法为轮询。
1 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { 2 if (this.sendLatencyFaultEnable) { 3 try { 4 int index = tpInfo.getSendWhichQueue().getAndIncrement();//记录选择累计次数 5 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { 6 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();//总次数对队列大小取余 7 if (pos < 0) 8 pos = 0; 9 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//根据余数取出一个队列返回,这样就保证了发送端几乎是均衡把消息发送到各个队列里面。 10 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { 11 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) 12 return mq; 13 } 14 } 15 16 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); 17 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); 18 if (writeQueueNums > 0) { 19 final MessageQueue mq = tpInfo.selectOneMessageQueue(); 20 if (notBestBroker != null) { 21 mq.setBrokerName(notBestBroker); 22 mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); 23 } 24 return mq; 25 } else { 26 latencyFaultTolerance.remove(notBestBroker); 27 } 28 } catch (Exception e) { 29 log.error("Error occurred when selecting message queue", e); 30 } 31 32 return tpInfo.selectOneMessageQueue(); 33 } 34 35 return tpInfo.selectOneMessageQueue(lastBrokerName); 36 }
getSendWhichQueue()说一下,看下代码:
这里用到了volatile关键字和ThreadLocal类,volatile保证了sendWhichQueue一旦变化,Group下其他Producer马上就能看到最新数据。而之所以用ThreadLocal<Integer>不用AtomicInteger类。因为这里只需要定义一个线程内的局部变量,只供本线程使用。而AtomicInteger主要是为了解决兵法问题。
1 public class TopicPublishInfo { 2 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); 3 } 4 5 public class ThreadLocalIndex { 6 private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); 7 }
5、DefaultMQProducerImpl.sendKernelImpl()方法,只贴出了主要代码,具体代码大家还是去看源码吧。
sendKernelImpl方法主要作用就是封包,发送消息给Broker。
1 private SendResult sendKernelImpl(final Message msg, 2 final MessageQueue mq, 3 final CommunicationMode communicationMode, 4 final SendCallback sendCallback, 5 final TopicPublishInfo topicPublishInfo, 6 final long timeout) { 7 //获取broker地址 8 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 9 //给消息设置唯一ID 10 MessageClientIDSetter.setUniqID(msg); 11 //当消息体大于4M时,进行压缩,注意只压缩消息体 12 UtilAll.compress(body, zipCompressLevel); 13 //使用是否VIPChannel通道发送数据,vip channel的端口号为普通端口号-2。 14 brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); 15 //发送消息校验 16 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
this.executeCheckForbiddenHook(checkForbiddenContext);
17 //发送消息前逻辑 18 SendMessageContext context = new SendMessageContext();
this.executeSendMessageHookBefore(context);
19 //构建发送消息请求 20 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); 21 //底层调用netty发送消息给broker。 22 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(); 23 return sendResult; 24 }