Dubbo服务提供者Provider启动流程下(四)

服务暴露实际上就是启动server本地监听,并且将服务信息注册到注册中心上。在dubbo:service上的export可以指定是否暴露,同时provider也可以指定延迟暴露的时间。

if (!shouldExport()) { // 判断是否暴露服务,由dubbo:service export="true|false"来指定。
            return;
        }
        // 服务暴露  
        if (shouldDelay()) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

doExport

ServiceConfig#doExport -> ServiceConfig#doExportUrls

    private void doExportUrls() {
        // 首先遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),
        // 然后将地址封装成URL对象,关于注册中心的所有配置属性,最终转换成url的属性(?属性名=属性值),
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
        // application=dubbo-demo-api-provider&dubbo=2.0.2&pid=32033&registry=zookeeper&timestamp=1639838932336
        List<URL> registryURLs = loadRegistries(true);
        /**
         * 每一个服务协议都会往多个注册中心暴露
         */
        for (ProtocolConfig protocolConfig : protocols) {
            // 在 URL.buildKey 中,将从 protocolConfig中获取的path,group,version进行拼装,最终形式类似于 ${group}/${path}:${version}
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            // 将服务提供者信息注册到ApplicationModel实例中。
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }

serviceConfig#doExportUrlsFor1Protocol

这个方法主要就是将服务的配置信息包装成一个url,并且本地启动服务,并将服务信息发布到注册中心。代码贴出来会比较多,我这边按重点贴出部分

代码前端包括了用Map存储该协议的所有配置参数,包括协议名称、dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供Dubbo:service的属性。

将service 里面的 method 里面的 argument 也加到 map中,其中就包括了callback。

然后泛化信息添加。另外将所有的方法名也添加进map,以逗号分隔,这边使用javassiat字节码生成器获取所有的方法具体详解代码   String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();

之后有token的处理,还有获取服务的host和port

        // 获取配置的ip,并连接绑定端口
        /**
         * 解析服务提供者的IP地址与端口。
         * 服务IP地址解析顺序:(序号越小越优先)
         *
         * 系统环境变量,变量名:DUBBO_DUBBO_IP_TO_BIND
         * 系统属性,变量名:DUBBO_DUBBO_IP_TO_BIND
         * 系统环境变量,变量名:DUBBO_IP_TO_BIND
         * 系统属性,变量名:DUBBO_IP_TO_BIND
         * dubbo:protocol 标签的host属性 --》 dubbo:provider 标签的host属性
         * 选择第一个可用网卡,其实现方式是建立socket,连接注册中心,获取socket的IP地址
         */
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        /**
         * 系统环境变量,变量名:DUBBO_DUBBO_PORT_TO_BIND
         * 系统属性,变量名:DUBBO_DUBBO_PORT_TO_BIND
         * 系统环境变量,变量名:DUBBO_PORT_TO_BIND
         * 系统属性,变量名DUBBO_PORT_TO_BIND
         * dubbo:protocol标签port属性、dubbo:provider标签的port属性。
         * 随机选择一个端口。
         */
        Integer port = this.findConfigedPorts(protocolConfig, name, map);

然后再一次封装服务的暴露URL 

URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

大体内容如下:

         /**
         * dubbo://192.168.0.102:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider
         * &bind.ip=192.168.0.102&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true
         * &deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&
         * interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=32191&register=true&release=&side=provider&timestamp=1639840839565
         */

接着获取 动态配置中心,并将url写到配置中心。接着往下

根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一。
如果scope不为remote,则先在本地暴露(injvm):,具体暴露服务的具体实现,将在remote 模式中详细分析。
如果scope不为local,则将服务暴露在远程。
remote方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。
如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
根据注册中心url(注册中心url),构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加monitor,其值为监控中心url(已编码)。

下面就是服务暴露的细节了

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
               // 缓存已经暴露的服务 exporters.add(exporter);

我们debug到invoke这边,对与invoke  后续会出专门的章节来介绍。这边只要了解到Invoke中的URL

registryURL = registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=33205&registry=zookeeper&timestamp=1639897064350

invoke.url =    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.0.102%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.0.102%26bind.port%3D20880%26default.deprecated%3Dfalse%26default.dynamic%3Dfalse%26default.register%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dfalse%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D33205%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1639897069365&pid=33205&registry=zookeeper&timestamp=1639897064350  其中协议就是register

dubbo的扩展中,对于扩展点接口的方法上有@Adaptive注解的话,会去参数里面获取URL,然后取对应的key,这边的key就是register。那么很好了解下面的方法了Exporter<?> exporter = protocol.export(wrapperInvoker);

实际上调用的RegistryProtocol的exporter。

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry  
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

在上面的URL providerUrl = getProviderUrl(originInvoker);方法中,把invoke中的url获取后,替换了前面的register协议,变成了dubbo协议,即dubbo://开头的URL。

接下来看下面的执行doLocalExport方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        // 根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
        });
    }

注意这里面使用了一个invoke的委托类,将url改成了providerUrl,即dubbo协议开头。所以根据dubbo的扩展实现,拿到了protocal实现类就是DubboProtocal。

DubboProtocal#export

   @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.   key = org.apache.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
        // 构造一个Exporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        // 是否将转发事件导出成stub
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        /**
         * 而后会调用 openServer ,其主要作用是开启一个 ExchangeServer, 实际就是对 底层网络通信的封装,在dubbo 传输协议中,使用的是Netty 作为网络传输协议,
         * Dubbo 内置也支持 Grizzly、Mina 等传输协议。
         *
         * 而 optimizeSerialization 则是 序列化优化器来优化url,不过目前版本(2.7.2)并没有实现其相关逻辑, SerializationOptimizer 没有子类。
         *
         * 最后返回该 exporter
         */
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

DubboProtocal#openServer

 private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

同一个主机上上不同服务,使用同一个address,所以共享一个ExchangeServer,如果内存中找不到,则create

private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())  // channel.readonly.sent=true
                // enable heartbeat by default
                .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)) // heartbeat=60000
                .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)  // codec=dubbo
                .build();
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // netty

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。
为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。
为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
根据SPI机制,判断server属性是否支持。
为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。
根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
验证客户端类型是否可用。

那么服务信息是怎么写到Zookeeper的呢?

在上文的RegisterProtocal#export -> RegisterProtocal#getRegistry

final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

 根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:ZookeeperRegistry

    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }

这里面registryFactory的实现类是啥呢?这个就在之前章节讲到了扩展点的IOC注入,实际上registryFactory也是有ExtensionLoader去加载的。

获取服务提供者URL中的register属性,如果为true,则调用注册中心的ZookeeperRegistry#register方法向注册中心注册服务(实际由其父类FailbackRegistry实现,ZookeeperRegistry extends FailbackRegistry)。

FailbackRegistry#register -> ZookeeperRegistry#doRegister

public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

最后服务提供者向注册中心订阅自己,主要是为了服务提供者URL发送变化后重新暴露服务,当然,会将dubbo:reference的check属性设置为false。

原文地址:https://www.cnblogs.com/gaojy/p/15705945.html