dubbo源码阅读-服务暴露(七)之远程暴露(dubbo)

说明

https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-10-0 注释:<17>处开始看

这里的SPI Key=registry  但是Protocol会被代理具体可以看: https://www.cnblogs.com/LQBlog/p/12470179.html#_label2 为什么会调用ProtocolListenerWrapper

Wrapper过后最终会调用RegistryProtocol

RegistryProtocol

<1>export

    /**
     * 此时的invoker的Url是 注册地址url?export暴露地址url
     *
     * @param originInvoker
     * @param <T>
     * @return
     * @throws RpcException
     */
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //<2>暴露服务获得ExporterChangeableWrapper
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        //获取注册中心地址
        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        //获取注册中心实现 默认是Zookeeper
        final Registry registry = getRegistry(originInvoker);
        //获取暴露地址
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        //获取是否注册 默认是true
        boolean register = registeredProviderUrl.getParameter("register", true);

        //将Invoker注册到本地缓存 key为Service全名称  如:com.alibaba.dubbo.demo.DemoService
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            //<7>注册到注册中心
            register(registryUrl, registeredProviderUrl);
            //是否注册 标记为true
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //订阅override数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        ///通过DestroyableExporter 包装
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

<2>doLocalExport

@SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        //获取Invoker封装Url 的export参数 也就是暴露url的String
        String key = getCacheKey(originInvoker);
        //尝试从缓存获取
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            //加锁
            synchronized (bounds) {
                //避免锁穿透
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    //二次代理  getProviderUrl返回暴露url
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    /**
                     * <3></>invokerDeleget封装了invoker 同时url封装的是暴露url 所以这里protocol通过spi获取 spi 是DubboProtocol
                     * protocol 是javasist动态生成 所以内部调用的是getExtension
                     * 可以看https://www.cnblogs.com/LQBlog/p/12453900.html#autoid-4-3-0
                     */
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        //返回的是通过ExporterChangeableWrapper 代理多的
        return exporter;
    }

<7>register

  public void register(URL registryUrl, URL registedProviderUrl) {
        //SPI扩展点 此时url已经变成了配置的协议而不是registry 如:zookeeper:/
        Registry registry = registryFactory.getRegistry(registryUrl);
        //注册到注册中心 具体可看:https://www.cnblogs.com/LQBlog/p/12522417.html#autoid-1-0-0
        registry.register(registedProviderUrl);
    }

DubboProtocol

<3>export

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //获取暴露URL
        URL url = invoker.getUrl();

        // export service.
        //获取暴露的service key  暴露接口全名称:端口   如:com.alibaba.dubbo.demo.DemoService:23888
        String key = serviceKey(url);
        //DubboExporter 再代理一层 exporterMap 为DubboProtocol成员变
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //invoker存入缓存
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event 是否配置dubbo.stub.event 默认false 是否is_callback_service 默认false
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        //<4>暴露服务
        openServer(url);
        //<6>注册额序列化类
        optimizeSerialization(url);
        return exporter;
    }

<4>openServer

  private void openServer(URL url) {
        // find server.
        //获取暴露地址10.3.17.72:23888
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        //   // 客户端是否可以暴露仅供服务器调用的服务 key:isserver 默认是true
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        //避免重复暴露 比如暴露多个服务 只有第一个能暴露
        if (isServer) {
            //获得交换机 该方法就是根据url携带的远程通信实现方法来创建一个服务器对象。
            ExchangeServer server = serverMap.get(key);

            if (server == null) {
                /**
                 * <5>暴露服务 获得ExchangeServer
                 * map的原因是 服务可以暴露不同端口
                 */
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                //如果存在 重置
                server.reset(url);
            }
        }
    }

<5>createServer

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        // // 服务器关闭时发送readonly事件,默认情况下启用
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        //// 心跳默认间隔一分钟
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        //// 获得远程通讯服务端实现方式,默认用netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        //添加编解码器DubboCodec实现
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            // 启动服务器并暴露服务 内部会启动netty服务暴露服务  后续流程请看:https://www.cnblogs.com/LQBlog/p/12517231.html
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        // // 获得客户端侧设置的远程通信方式 client
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

<6>optimizeSerialization

    /**
     * 该方法是把序列化的类放入到集合,以便进行序列化
     * @param url
     * @throws RpcException
     */
    private void optimizeSerialization(URL url) throws RpcException {
        //是否配置 optimizer=className 实现类  可以看具体可以看https://blog.csdn.net/moonpure/article/details/53175519
        String className = url.getParameter(Constants.OPTIMIZER_KEY, "");
        if (StringUtils.isEmpty(className) || optimizers.contains(className)) {
            return;
        }

        logger.info("Optimizing the serialization process for Kryo, FST, etc...");

        try {
            Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
            if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
                throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
            }

            SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();

            if (optimizer.getSerializableClasses() == null) {
                return;
            }

            for (Class c : optimizer.getSerializableClasses()) {
                //把要序列化的类注册
                SerializableClassRegistry.registerClass(c);
            }

            optimizers.add(className);
        } catch (ClassNotFoundException e) {
            throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
        } catch (InstantiationException e) {
            throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
        } catch (IllegalAccessException e) {
            throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
        }
    }
原文地址:https://www.cnblogs.com/LQBlog/p/12470681.html