在《ServiceBean与ReferenceBean创建流程》中的第4.1步说了,在获取invoker时就进行了与注册中心的连接,注册,订阅等一系列操作,由于提供者与消费者大致流程一致,所以这里以消费者端为例,提供者可以以相同的方式分析。
其入口的地方为ReferenceConfig#createProxy方法中的这一行代码:
invoker = refprotocol.refer(interfaceClass, urls.get(0));
refprotocol是一个SPI,这里以dubbo协议,zookeeper注册中心为例。我们在DubboProtocol类的refer方法中打一个断点,然后我们可以调用栈里面看到从上面这一行代码到断点之间有非常长的调用链,本文不关心它如何生成的,需要了解的是其中大致流程过程。
服务注册
一、
1 @Override 2 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 3 // REGISTRY_PROTOCOL = "registry" 4 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 5 return protocol.refer(type, url); 6 } 7 return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); 8 }
ReferenceConfig#createProxy方法中,url的protocol被初始化成了registry,然后会进入到QosProtocolWrapper#refer。
二、
1 @Override 2 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 3 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 4 // 启动qos服务 5 startQosServer(url); 6 return protocol.refer(type, url); 7 } 8 return protocol.refer(type, url); 9 }
该类是用于启动dubbo qos服务的包装类,然后进入ProtocolListenerWrapper#refer
1 @Override 2 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 3 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 4 return protocol.refer(type, url); 5 } 6 return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 7 Collections.unmodifiableList( 8 ExtensionLoader.getExtensionLoader(InvokerListener.class) 9 .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); 10 }
该类是用于将invoker封装成ListenerInvokerWrapper,ListenerInvokerWrapper的作用是调用invoker的某些方法时触发listener。然后进入到RegistryProtocol#refer。
三、
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 // 获取注册中心协议,这里是zookeeper 3 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 4 // 3.1 获取registry对象,在获取对象中就进行了连接 5 Registry registry = registryFactory.getRegistry(url); 6 if (RegistryService.class.equals(type)) { 7 return proxyFactory.getInvoker((T) registry, type, url); 8 } 9 // group="a,b" or group="*" 10 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 11 String group = qs.get(Constants.GROUP_KEY); 12 if (group != null && group.length() > 0) { 13 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 14 || "*".equals(group)) { 15 return doRefer(getMergeableCluster(), registry, type, url); 16 } 17 } 18 return doRefer(cluster, registry, type, url); 19 }
由于注册中心是zookeeper,所以这里的RegistryFactory对象是ZooKeeperRegistryFactory对象,Registry对象是ZooKeeperRegistry。在创建ZooKeeperRegistry对象的过程中就会连接ZooKeeper,详细流程代码可以自行debug。
创建完Registry对象后就会调用本类的doRefer方法:
1 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 2 // 创建注册发现类 3 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 4 directory.setRegistry(registry); 5 directory.setProtocol(protocol); 6 // all attributes of REFER_KEY 7 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 8 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 9 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 10 && url.getParameter(Constants.REGISTER_KEY, true)) { 11 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 12 Constants.CHECK_KEY, String.valueOf(false))); 13 } 14 // 订阅 15 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 16 Constants.PROVIDERS_CATEGORY 17 + "," + Constants.CONFIGURATORS_CATEGORY 18 + "," + Constants.ROUTERS_CATEGORY)); 19 20 Invoker invoker = cluster.join(directory); 21 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 22 return invoker; 23 }
RegistryDirectory类中包含了注册中心信息、路由信息、消费者信息等等。同时该类实现了NotifyListener接口,会将该类进行包装为ZooKeeper的Listener,然后注册到ZooKeeper上。
接下来就是调用该类的subscribe方法进行订阅。该方法最终会调用到ZookeeperRegistry#doSubscribe方法。
四、
1 @Override 2 protected void doSubscribe(final URL url, final NotifyListener listener) { 3 try { 4 // 省略部分代码 5 6 // 调用listener 7 notify(url, listener, urls); 8 } catch (Throwable e) { 9 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 10 } 11 }
这里省略了一大段代码,这一大段代码无非就是在ZooKeeper上创建节点,并且将listener注册到这些节点上。这里的listener就是上一步中的RegistryDirectory对象。最终会调用到RegistryDirectory#notify方法。
五、
1 @Override 2 public synchronized void notify(List<URL> urls) { 3 // provider url 4 List<URL> invokerUrls = new ArrayList<URL>(); 5 List<URL> routerUrls = new ArrayList<URL>(); 6 List<URL> configuratorUrls = new ArrayList<URL>(); 7 for (URL url : urls) { 8 String protocol = url.getProtocol(); 9 String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 10 if (Constants.ROUTERS_CATEGORY.equals(category) 11 || Constants.ROUTE_PROTOCOL.equals(protocol)) { 12 routerUrls.add(url); 13 } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 14 || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { 15 configuratorUrls.add(url); 16 } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { 17 invokerUrls.add(url); 18 } else { 19 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); 20 } 21 } 22 // configurators 23 if (configuratorUrls != null && !configuratorUrls.isEmpty()) { 24 this.configurators = toConfigurators(configuratorUrls); 25 } 26 // routers 27 if (routerUrls != null && !routerUrls.isEmpty()) { 28 List<Router> routers = toRouters(routerUrls); 29 if (routers != null) { // null - do nothing 30 setRouters(routers); 31 } 32 } 33 List<Configurator> localConfigurators = this.configurators; // local reference 34 // merge override parameters 35 this.overrideDirectoryUrl = directoryUrl; 36 if (localConfigurators != null && !localConfigurators.isEmpty()) { 37 for (Configurator configurator : localConfigurators) { 38 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); 39 } 40 } 41 // providers 42 // 刷新invoker 43 refreshInvoker(invokerUrls); 44 }
该方法除了最后一行,其他都是在更新信息,核心是最后一行。
1 private void refreshInvoker(List<URL> invokerUrls) { 2 // 省略部分代码 3 4 // 将provider的url转换成invoker 5 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map 6 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map 7 8 // 省略部分代码 9 } 10 11 private Map<String, Invoker<T>> toInvokers(List<URL> urls) { 12 // 省略部分代码 13 14 if (invoker == null) { // Not in the cache, refer again 15 try { 16 boolean enabled = true; 17 if (url.hasParameter(Constants.DISABLED_KEY)) { 18 enabled = !url.getParameter(Constants.DISABLED_KEY, false); 19 } else { 20 enabled = url.getParameter(Constants.ENABLED_KEY, true); 21 } 22 if (enabled) { 23 // 这里的protocol是一个SPI 24 invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); 25 } 26 } catch (Throwable t) { 27 logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); 28 } 29 if (invoker != null) { // Put new invoker in cache 30 newUrlInvokerMap.put(key, invoker); 31 } 32 33 }34 35 // 省略部分代码 36 }
第24行又重新通过SPI获取invoker对象,与最开始的入口一样,所以这里又会从第一步开始往下走,不过不一样的是,url的registry字段不再是registry而是我们设置的dubbo协议。所以他最后会进入到DubboProtocol#refer方法。
六、
1 @Override 2 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 3 // 是否优化 4 optimizeSerialization(url); 5 // create rpc invoker. 6 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 7 invokers.add(invoker); 8 return invoker; 9 }
该方法创建了一个DubboInvoker对象并返回,需要注意的是代码中调用了getClients方法,该方法会创建一个指向提供者的单向的长连接,这部分在后续的文章做讨论。
以此便完成了注册中心的连接、注册以及invoker的创建。
服务调用
我们通过上文可以知道,invoker被封装成了一个非常长的调用链,每一个调用都会有不同的功能,这里不细说,我们直接进入最后一个调用链节点DubboInvoker。 整个流程如下图所示:
消费者
DubboInvoker是具体调用逻辑的实现类,所以我们直接来看DubboInvoker的doInvoke方法,代码如下:
1 protected Result doInvoke(final Invocation invocation) throws Throwable { 2 // 省略部分代码 3 // 获取client 4 ExchangeClient currentClient; 5 if (clients.length == 1) { 6 currentClient = clients[0]; 7 } else { 8 currentClient = clients[index.getAndIncrement() % clients.length]; 9 } 10 // 发起请求 11 return (Result) currentClient.request(inv, timeout).get(); 12 }
1 public ResponseFuture request(Object request, int timeout) throws RemotingException { 2 if (closed) { 3 throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); 4 } 5 // 创建Request对象 6 Request req = new Request(); 7 req.setVersion("2.0.0"); 8 req.setTwoWay(true); 9 req.setData(request); 10 // 创建Future 11 DefaultFuture future = new DefaultFuture(channel, req, timeout); 12 try { 13 // 发送请求 14 channel.send(req); 15 } catch (RemotingException e) { 16 future.cancel(); 17 throw e; 18 } 19 return future; 20 }
1 public void send(Object message, boolean sent) throws RemotingException { 2 super.send(message, sent); 3 4 boolean success = true; 5 int timeout = 0; 6 try { 7 // 写数据 8 // 这里的channel就是Netty的NioSocketChannel 9 ChannelFuture future = channel.write(message); 10 if (sent) { 11 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 12 success = future.await(timeout); 13 } 14 Throwable cause = future.getCause(); 15 if (cause != null) { 16 throw cause; 17 } 18 } catch (Throwable e) { 19 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); 20 } 21 22 if (!success) { 23 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() 24 + "in timeout(" + timeout + "ms) limit"); 25 } 26 }
以上消费者就把请求发送给了提供者,并且将自身线程挂起,等待服务器返回或者超时。
提供者
1 public void received(Channel channel, Object message) throws RemotingException { 2 // 省略部分代码 3 // 获取线程池 4 ExecutorService cexecutor = getExecutorService(); 5 // 提交任务到线程池处理 6 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); 7 8 }
1 public void run() { 2 switch (state) { 3 case CONNECTED: 4 try { 5 handler.connected(channel); 6 } catch (Exception e) { 7 logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); 8 } 9 break; 10 case DISCONNECTED: 11 try { 12 handler.disconnected(channel); 13 } catch (Exception e) { 14 logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); 15 } 16 break; 17 case SENT: 18 try { 19 handler.sent(channel, message); 20 } catch (Exception e) { 21 logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel 22 + ", message is " + message, e); 23 } 24 break; 25 case RECEIVED: 26 try { 27 handler.received(channel, message); 28 } catch (Exception e) { 29 logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel 30 + ", message is " + message, e); 31 } 32 break; 33 case CAUGHT: 34 try { 35 handler.caught(channel, exception); 36 } catch (Exception e) { 37 logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel 38 + ", message is: " + message + ", exception is " + exception, e); 39 } 40 break; 41 default: 42 logger.warn("unknown state: " + state + ", message is " + message); 43 } 44 }
消费者
1 public void received(Channel channel, Object message) throws RemotingException { 2 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 3 ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); 4 try { 5 // 处理Request 6 if (message instanceof Request) { 7 // handle request. 8 Request request = (Request) message; 9 if (request.isEvent()) { 10 handlerEvent(channel, request); 11 } else { 12 if (request.isTwoWay()) { 13 // 处理普通请求 14 Response response = handleRequest(exchangeChannel, request); 15 channel.send(response); 16 } else { 17 handler.received(exchangeChannel, request.getData()); 18 } 19 } 20 // 处理Response 21 } else if (message instanceof Response) { 22 handleResponse(channel, (Response) message); 23 } 24 // 省略部分代码 25 } finally { 26 HeaderExchangeChannel.removeChannelIfDisconnected(channel); 27 } 28 }