dubbo学习(四)服务导出

概述

配置 dubbo

我们在使用dubbo使用的时候,首先在 resources 下创建 dubbo-consumer.xmldubbo-provider.xml 就可以了,例如我们看一下 dubbo-provider.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
	http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
	<dubbo:provider protocol="dubbo" port="-1"/>
	<!--<dubbo:service interface="cn.healthmall.order.service.AppMsgPushUpdateService" ref="appMsgPushUpdateService" version="1.0.0" retries="0"/>-->


</beans>

provider 服务接口定义如下 :

public interface DemoService {

    String sayHello(String name);

    default CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.completedFuture(sayHello(name));
    }

}

我们这一节了解一下 dubbo是如何将bean 导出的。

源码分析

DubboBootrap 是如何被嵌入到 Springboot 中启动的呢 ?

DubboBootrap 是启动类,它包含了Dubbo的启动,我们从名字也可以看得出,那么在 Spring boot 是如何Dubbo是如何启动的呢?我们从springboot 流程中可以知道 Springboot 启动中允许用户根据需求扩展 Listener ,达到监听 Springboot 事件的作用,我们看一下这个类 :

1297993-20200408150655466-1563553660.png

主要是这个类,我们在讲 Springboot 启动流程中的时候讲到了 Springboot 会广播例如像刷新 context 一样的事件,而 dubbo 生成自己的监听器(Listener)来开启 Dubbo 启动的逻辑,具体的代码 :

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }     

}

接下里就是 DubboBootstrap 中的 start 方法方法

   /**
     * Start the bootstrap
     */
    public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 导出服务
            // 1. export Dubbo Services
            exportServices();

            // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
                // 2. export MetadataService 
                exportMetadataService();
                //3. Register the local ServiceInstance if required
                registerServiceInstance();
            }
            // 
            referServices();

            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }

我们看一下 exportServices 方法,这个方法,先是 configManager 中获取多个 serviceBean ,而 serviceBean 又是如何来的呢? 是从;

    private void exportServices() {
        configManager.getServices().forEach(sc -> {
            // TODO, compatible with ServiceConfig.export()
            ServiceConfig serviceConfig = (ServiceConfig) sc;
            serviceConfig.setBootstrap(this);

            if (exportAsync) {
                ExecutorService executor = executorRepository.getServiceExporterExecutor();
                Future<?> future = executor.submit(() -> {
                    sc.export();
                });
                asyncExportingFutures.add(future);
            } else {
                
                sc.export();
                exportedServices.add(sc);
            }
        });
    }

serviceBean 就是 service bean 的抽象,类结构图

1297993-20200408155542835-1487373101.png

serviceBean 继承自 ServiceConfig ,ServiceConfig 本身带了一个 ScheduledExecutorService 用于某个 bean 延迟启动 (这个感觉类的对象消耗挺大的,因为每个 service bean 都带有一个 ScheduledExecutorService 字段,而服务一般都会有多个),我们在代码 sc.export() 这句就会看到具体的实现。

    //ServiceConfig类方法
    public synchronized void export() {
        if (!shouldExport()) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        checkAndUpdateSubConfigs();

        //init serviceMetadata
        serviceMetadata.setVersion(version);
        serviceMetadata.setGroup(group);
        serviceMetadata.setDefaultGroup(group);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());

        //延迟导出
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::
                    doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            //最终调用这个方法
            doExport();
        }
    }

    //ServiceConfig类方法
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        //
        doExportUrls();

        // dispatch a ServiceConfigExportedEvent since 2.7.4
        dispatch(new ServiceConfigExportedEvent(this));
    } 

    private void doExportUrls() {
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            // TODO, uncomment this line once service key is unified
            serviceMetadata.setServiceKey(pathKey);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }


通过调试, registryURLs 一个例子例如这样 :

registryURLs : registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=5300&qos.port=22222&registry=zookeeper&timestamp=1586333795545

可以知道前面的逻辑是获取 protocol 信息,获取知道 某个 service 是通过什么 protocol 进行传输的,于是进入了这个方法 :

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
  
        //这一段设置了一堆信息到 map 中,设置完的map 的信息如下图 
        .....

        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);

        // 下面开始导出服务了!!!核心逻辑
        // export service
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {

                        ....

                        //注意这里!!使用 ProxyFactory 来生成一个 Invoker  (相当与 spring 中的 AOP 实现)
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

                        //生成包装类
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        //交给各个协议调用 export 方法了
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);
    }

很长,但是我们看到核心的逻辑如下 :

  • 封装 map信息
  • PROXY_FACTORY 生成 invoker
  • 生成包装类
  • 调用 protocol 的 export 方法

ok, 到了 protocol 的 export 方法,protocol 是个接口,我们从下图可以看到到最后注册在 zk 中,实际上这个 protocol 对象被包装了多层,涉及到的类如下

1297993-20200408173249859-513091.png

从上至下依次为 :

  • ProtocolListenerWrapper
  • ProtocolFilterWrapper
  • QosProtocolWrapper
  • RegistryProtocol

我们从名字也大概可以猜到各个类的作用,我们来看一下 RegistryProtocol 这个类的 exprot 作用,从名字可以知道该类为了服务 provider 注册到注册中心去的。


    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            //注册的逻辑在这
            register(registryUrl, registeredProviderUrl);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);

        ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
        model.addStatedUrl(new ProviderModel.RegisterStatedURL(
                registeredProviderUrl,
                registryUrl,
                true
        ));
    }

而 Register 是个接口它和 protocol 类一样,被包装了多层,我们从上面的调试图片也可以看到,在FailbackRegistry 类中的 register 方法中,

    @Override
    public void register(URL url) {
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            // 注册方法!!
            doRegister(url);
        } catch (Exception e) {
            ...
        }      
        ...
    }

最终到了 ZookeeperRegistry 这个类的注册方法,很好理解,利用 zkClient 完成节点注册。

    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

至此,我们完成了服务的导出整个流程。

补充

ProxyFactory 是什么

提外话看一下这个 ProxyFactory 到底是什么

/**
 * ProxyFactory. (API/SPI, Singleton, ThreadSafe) 单例,线程安全的
 */
@SPI("javassist")
public interface ProxyFactory {


    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;


    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;


    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}


//例如一个实现,利用反射返回对象
public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}


protocol 接口是如何被包装的呢?

上面我们看到 protocol 调用 export 方法的时候实际上是一个 protocol&Adaptive 的包装类,我们了解一下这个包装类是如何形成的呢?在上一篇的 SPI 机制有讲到,可以前往查看。

参考资料

  • http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html
原文地址:https://www.cnblogs.com/Benjious/p/12666826.html