RocketMQ源码分析之Producer启动(四)

我们主要分析图中红色矩形区域,最上面RemotingClient属于RocketMQ-remoting内容,以后再分析。从图中也可以清晰的看到RocketMQ底层通信协议使用了Netty(netty-all-4.0.42.Final版本)。

言归正传,红色矩形局域里面大家看调用关系就可以看出来,MQClientInstance是非常核心的一个类,它起着承上启下的衔接作用。MQClientInstance是Producer与MQ交互的实例,一个client只有一个MQClientInstance实例。它包含了topic路由,broker地址等信息,同时负责启动底层通信服务(MQClientAPIImpl)、定时任务服务(ScheduledExecutorService)、拉取消息服务(PullMessageService),负载均衡服务(RebalanceService)等等。

下面我们通过一个Producer启动时序图,看看完整的调用过程。

 

1、MyProducer是我们自己的生产者,主要代码如下:

1 MQProducer producer = new DefaultMQProducer("MyGroup");//创建一个默认生产者
2 producer.setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");//设置NameServer地址,多个地址分号分隔
3 producer.start();//启动
4 
5 ......
6 
7 producer.shutdown();//关闭

new一个DefaultMQProducer对象,设置注册中心地址,然后启动。

DefaultMQProducer构造器,从DefaultMQProducer构造器可以看出Producer的主要业务逻辑是在DefaultMQProducerImpl中完成的。

1 public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
2     this.producerGroup = producerGroup;
3     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
4 }
NamesrvAddr赋值有三种方式:
  • 通过setNamesrvAddr方法设置
  • 通过系统环境变量设置
  • 通过fetchNameServerAddr()方法调用http接口去寻址,需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址。

启动时查找NamesrvAddr的优先级:setNamesrvAddr()-->环境变量-->fetchNameServerAddr()


ClientConfig类中有这样一行代码,对namesrvAddr进行了初始化,可以看到默认值确实是从系统环境变量里取的。
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));

2、DefaultMQProducer类

DefaultMQProducer构造器我们上面讲过了,就是给producerGroup赋值,并创建具体实现类DefaultMQProducerImpl对象。
DefaultMQProducer.start()方法:start方法直接调用了DefaultMQProducerImpl的start()方法。
1     @Override
2     public void start() throws MQClientException {
3         this.defaultMQProducerImpl.start();
4     }

3、DefaultMQProducerImpl类(重点),省略了异常,日志输出代码。

 1     public void start(final boolean startFactory) throws MQClientException {
 2         switch (this.serviceState) {
 3             case CREATE_JUST:
 4                 this.serviceState = ServiceState.START_FAILED;
 5                 //检查producerGroup是否合法
 6                 this.checkConfig();
 7 
 8                 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
 9                     this.defaultMQProducer.changeInstanceNameToPID();
10                 }
11 
12                 //获取MQClientInstance实例,注意单例模式,一个JVM内只能有一个MQClientInstance实例。
13                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
14                 //注册Producer
15                 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
16                 if (!registerOK) {
17                     this.serviceState = ServiceState.CREATE_JUST;
18                     throw new MQClientException();
19                 }
20 
21                 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
22 
23                 if (startFactory) {
24                     //调用MQClientInstance start方法
25                     mQClientFactory.start();
26                 }
27 
28                 this.serviceState = ServiceState.RUNNING;
29                 break;
30             case RUNNING:
31             case START_FAILED:
32             case SHUTDOWN_ALREADY:
33                 throw new MQClientException()
34             default:
35                 break;
36         }
37         //向所有Broker发送心跳
38         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
39     }

第六行checkConfig方法检查producerGroup是否合法,主要有以下几个检查点:

  • groupName不能为空
  • 非法字符检查
  • 最大长度不超过255字节
  • 自定义的groupName不能定义为"DEFAULT_PRODUCER",因为DEFAULT_PRODUCER是RocketMQ默认的分组名称

MQClientManager.getAndCreateMQClientInstance方法代码:

 1     public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
 2         //生成clientId,规则:{ip}@{instanceName}
 3         String clientId = clientConfig.buildMQClientId();
 4         MQClientInstance instance = this.factoryTable.get(clientId);
 5         //如果instance存在直接返回,否则创建一个新的MQClientInstance对象。
 6         if (null == instance) {
 7             instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
 8             //如果传入的key存在,就返回已存在的value
 9             MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
10             if (prev != null) {
11                 instance = prev;
12                 log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
13             } else {
14                 log.info("Created new MQClientInstance for clientId:[{}]", clientId);
15             }
16         }
17 
18         return instance;
19     }

getAndCreateMQClientInstance()中心思想就是要返回一个MQClientInstance对象。方法里使用了双重判断,这么没什么好说的。

注意ConcurrentHashMap的putIfAbsent用法,有3年以上编程经验的人必须要知道。

4、MQClientInstance.registerProducer()方法

这个方法名起得有点误导人,让人以为是要把Producer注册到NameServer上。其实只是把当前的Producer放到ConcurrentHashMap中,key是groupName,value是producer对象。代码如下:

 1     public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
 2         if (null == group || null == producer) {
 3             return false;
 4         }
 5 
 6         MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
 7         if (prev != null) {
 8             log.warn("the producer group[{}] exist already.", group);
 9             return false;
10         }
11 
12         return true;
13     }

5、MQClientInstance.start()方法,贴出来的代码省略了switch

 1 public void start() throws MQClientException {
 2     synchronized (this) {
 3         this.serviceState = ServiceState.START_FAILED;
 4         // 如果没有namesrvAddr则去查找,fetchNameServerAddr方法下面再详细说
 5         if (null == this.clientConfig.getNamesrvAddr()) {
 6             this.mQClientAPIImpl.fetchNameServerAddr();
 7         }
 8         // 启动请求相应通道,打开channel
 9         this.mQClientAPIImpl.start();
10         // 启动定时任务
11         this.startScheduledTask();
12         // 启动拉取消息服务
13         this.pullMessageService.start();
14         // 启动负载均衡服务
15         this.rebalanceService.start();
16         // 启动消息推送服务,注意这里又回去调用了DefaultMQProducerImpl的start方法,但是参数是false。
17         this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
18         this.serviceState = ServiceState.RUNNING;
19     }
20 }

6、MQClientAPIImpl.fetchNameServerAddr()方法

从下面的源码可以清楚的看到寻址方法最后调用调用http接口去寻址,前提需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址。

 1     public String fetchNameServerAddr() {
 2         try {
 3             String addrs = this.topAddressing.fetchNSAddr();
 4             if (addrs != null) {
 5                 if (!addrs.equals(this.nameSrvAddr)) {
 6                     this.updateNameServerAddressList(addrs);
 7                     this.nameSrvAddr = addrs;
 8                     return nameSrvAddr;
 9                 }
10             }
11         } catch (Exception e) {
12         }
13         return nameSrvAddr;
14     }
15 
16     public final String fetchNSAddr() {
17         return fetchNSAddr(true, 3000);
18     }
19 
20     public final String fetchNSAddr(boolean verbose, long timeoutMills) {
21         String url = this.wsAddr;
22         if (!UtilAll.isBlank(this.unitName)) {
23             url = url + "-" + this.unitName + "?nofix=1";
24         }
25         HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
26         if (200 == result.code) {
27             String responseStr = result.content;
28             if (responseStr != null) {
29                 return clearNewLine(responseStr);
30             } else {
31                 log.error("fetch nameserver address is null");
32             }
33         } else {
34             log.error("fetch nameserver address failed. statusCode=" + result.code);
35         }
36         return null;
37     }

7、MQClientAPIImpl.start()方法

建立底层通信channel,MQClientAPIImpl.start方法最后调用RemotingClient的start方法,而remotingClient调用了netty建立底层通信。

 1     public void start() {
 2         this.remotingClient.start();
 3     }
 4 
 5     @Override
 6     public void start() {
 7         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
 8             nettyClientConfig.getClientWorkerThreads(),
 9             new ThreadFactory() {
10                 private AtomicInteger threadIndex = new AtomicInteger(0);
11                 @Override
12                 public Thread newThread(Runnable r) {
13                     return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
14                 }
15             });
16 
17         Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
18             .option(ChannelOption.TCP_NODELAY, true)
19             .option(ChannelOption.SO_KEEPALIVE, false)
20             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
21             .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
22             .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
23             .handler(new ChannelInitializer<SocketChannel>() {
24                 @Override
25                 public void initChannel(SocketChannel ch) throws Exception {
26                     ChannelPipeline pipeline = ch.pipeline();
27                     if (nettyClientConfig.isUseTLS()) {
28                         if (null != sslContext) {
29                             pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
30                             log.info("Prepend SSL handler");
31                         } else {
32                             log.warn("Connections are insecure as SSLContext is null!");
33                         }
34                     }
35                     pipeline.addLast(
36                         defaultEventExecutorGroup,
37                         new NettyEncoder(),
38                         new NettyDecoder(),
39                         new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
40                         new NettyConnectManageHandler(),
41                         new NettyClientHandler());
42                 }
43             });
44 
45         this.timer.scheduleAtFixedRate(new TimerTask() {
46             @Override
47             public void run() {
48                 try {
49                     NettyRemotingClient.this.scanResponseTable();
50                 } catch (Throwable e) {
51                     log.error("scanResponseTable exception", e);
52                 }
53             }
54         }, 1000 * 3, 1000);
55 
56         if (this.channelEventListener != null) {
57             this.nettyEventExecutor.start();
58         }
59     }

8、MQClientInstance.startScheduledTask()方法,省略定时代码

 1     private void startScheduledTask() {
 2         //如果当前客户端没有指定setNamesrvAddr,启动查找NamesrvAddr地址服务,每两分钟一次
 3         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
 4 
 5         //从NameServerh获取topic信息后,更新客户端topic路由信息
 6         MQClientInstance.this.updateTopicRouteInfoFromNameServer();
 7 
 8         //定时清理已经不存在的broker服务
 9         MQClientInstance.this.cleanOfflineBroker();
       //定时发送心跳服务
10 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 11 12 //定时做consumer offset持久化  13 MQClientInstance.this.persistAllConsumerOffset(); 14 15 //定时调整消费线程池 16 MQClientInstance.this.adjustThreadPool(); 17 }

9、PullMessageService、RebalanceService

这两个类都是Consumer端用的,这里先不分析了。有的人可能会有疑问,怎么讲Producer启动,里面还有Consumer的调用。很简单,因为Producer和Consumer相对于Broker都是客户端,两者都会调用MQClientInstance。

一个西瓜圆又圆......打完收工!



原文地址:https://www.cnblogs.com/shileibrave/p/9884407.html