dubbo客户端源码分析(一)

rpc框架有很多,公司自研、开源的thrift、dubbo、grpc等。我用过几个框架,了解了一下实现原理,客户端基本都是用代理实现,jdk动态代理、cglib等。最近一段时间想了解一下dubbo源码,看下工作原理。今天看了一下客户端初始化源码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <dubbo:application name="demo-consumer"/>
    <!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
    <dubbo:registry address="zookeeper://127.0.0.1:2181" />
    <dubbo:reference id="demoService" interface="com.gxf.dubbo.demo.DemoSerivce"/>
</beans>

这个是dubbo客户端配置,注册中心是本地zk。其中,dubbo是阿里基于spring扩展的schema

https://gist.github.com/dchjmichael/07dfd189c4c29bab63ec

这个文档关于spring schema扩展用法写的很不错,要定义xsd, handler, 解析xml的parser

http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

这是dubbo的spring-handler.xml,可以找到spring handler

 1 public class DubboNamespaceHandler extends NamespaceHandlerSupport {
 2 
 3     static {
 4         Version.checkDuplicate(DubboNamespaceHandler.class);
 5     }
 6 
 7     public void init() {
 8         registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
 9         registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
10         registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
11         registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
12         registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
13         registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
14         registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
15         registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
16         registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
17         registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
18     }
19 
20 }

这里我们关注客户端,看reference相关的就可以了,看下DubboBeanDefinitionParser

类的内容有点多,主要工作是注入了一个ReferenceBean的bean

我们可以看下这个类

public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

这个类实现了FactoryBean, InitializingBean这个就有点生成动态代理的套路了

我们看getObject()方法

public Object getObject() throws Exception {
        return get();
    }

进入get()方法

public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

跟进init()方法

ref = createProxy(map);

init()方法内容有点多,主要看下这个段, ref也是getBean()返回的对象,这里看方法名可以推测是用的代理

private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                //默认情况下如果本地有服务暴露,则引用本地服务.
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // 通过注册中心配置拼装URL
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一个registry url
                    }
                }
                if (registryURL != null) { // 有 注册中心协议的URL
                    // 对有注册中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 注册中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }

这个方法内容也挺多,今天没看完。应该是代理模式,

这里有个有意思的地方是,如果服务端没有启动在zk中注册,这里生成客户端代理的时候会抛异常。这个是后面需要去分析源码的,还有server端如何暴露服务的,即监听对应的端口,接收客户端的请求。以及在zk里面保存的内容

今天还发现一个有意思的地方,dubbo里面有泛化调用。看了泛化调用部分代码,感觉公司用的泛化调用,应该是参考了dubbo的

原文地址:https://www.cnblogs.com/luckygxf/p/9966915.html