dubbo学习(五)服务引用

问题

  • 为什么返回的对象不直接是 Invoker 而是代理类呢

回答 :

Dubbo服务暴露的主要目的是让本地的服务bean能够让其它进程通过网络调用。在暴露服务前,dubbo需要根据配置信息收集服务相关信息,服务的配置信息都配置在ServiceConfig中。Dubbo接收到Spring触发的ContextRefreshedEvent事件后,dubbo进行真正的服务暴露过程。Dubbo服务暴露目的是让消费者调用, Dubbo会将注册的服务bean包装为可执行对象Invoker,但是暴露的服务bean不能任意被消费者调用,所以消费者对服务bean的访问需要受到控制,这些控制的逻辑会放在代理逻辑中实现,而ProxyFactory用于生成代理对象。除此之外,当前进程的服务bean能够被其它进程调用的前提是能够被其它进程发现,所以这会涉及到服务注册的问题,Registry抽象了注册中心。服务调用又会涉及到网络协议,比如长连接还是短连接协议,关于暴露协议的封装由Protocol接口负责。

概述

服务引用的时机

Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。

服务引用概述过程

外界看到的某个接口的服务引用,而实际是dubbo 经过包装配置的invoker对象。在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。

当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。

源码分析

服务引用的分析从 ReferenceBean 开始,看一下类结构

1297993-20200410150344852-1069652921.png

其中 FactoryBean 和 ApplicationContextAware, InitializingBean 都是继承自 spring 用于在 springboot 引用,

    //这个方法重写自  InitializingBean,该方法用于当bean set完属性后,用户进行自定义的逻辑
    @Override
    @SuppressWarnings({"unchecked"})
    public void afterPropertiesSet() throws Exception {
        // 初始化 dubbo 配置 bean
        // Initializes Dubbo's Config Beans before @Reference bean autowiring
        prepareDubboConfigBeans();

        // lazy init by default.
        if (init == null) {
            init = false;
        }

        // eager init if necessary.
        if (shouldInit()) {
            getObject();
        }
    }

通过 createProxy 的调用栈看到调用会经过 RegistryProtocol 的 doRefer 方法,从这里我们可以大概知道生成 invoker 的过程。

1297993-20200413134313913-564279773.png

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //这个 RegistryDirectory 存放注册信息的资料,当注册信息发生变化的时候通过 subscribe 方法分发,内部存在一些监听器处理信息时间,同时内部
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //订阅 providers、configurators、routers 等节点数据
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        //这才是真正生成 invoker 的地方
        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
        Invoker invoker = cluster.join(directory);
        return invoker;
    }

我们发现在返回 inovker 之前的 ,会调用 RegistryDirectory 的 subscribe 方法,这个方法正是就是同个服务提供者生成多个 invoker 的地方,调用栈如下,可以看见该例中调用的是 DubboProtocol 。

1297993-20200410152801789-1266734247.png

我们稍微讲解一下,然后再分析 DubboProtocol 生成 invoker 的过程。 首先,多个提供者生成 invoker 的步骤在 AbstractRegistry 的 notify 方法中(可以从前面的调用栈找到)

    /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ...

        // keep every provider's category. 找到提供者的目录(提供者信息)
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //更新操作!!!!要是第一次,必然多个提供者创建多个invoker,从调用栈可以看出下面的 notity 方法最终也调用到了 DubboProtocol 生成 DubboInvoker 的过程。 
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }

最后 AbstractProtocol 的 refer 被调用

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

DubboProtocol 的 protocolBindingRefer 被调用 :

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

到了这里我们,逻辑不难懂,就是创建一个 DubboInvoker 然后返回。其中有个 getClients 的方法,这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。这里做一下分析,DubboProtocol 的 buildReferenceCountExchangeClient 方法

    /**
     * Build a single client
     *
     * @param url
     * @return
     */
    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
        ExchangeClient exchangeClient = initClient(url);

        return new ReferenceCountExchangeClient(exchangeClient);
    }


       /**
     * Create new connection
     *
     * @param url
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy 
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                //假如有指定懒加载
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                //使用 SPI 机制进行调用 
                client = Exchangers.connect(url, requestHandler);
            }

        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }

        return client;
    }

最后会经过

/**
 * DefaultMessenger
 *
 *
 */
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 这里包含了多个调用,分别如下:
        // 1. 创建 HeaderExchangeHandler 对象
        // 2. 创建 DecodeHandler 对象
        // 3. 通过 Transporters 构建 Client 实例
        // 4. 创建 HeaderExchangeClient 对象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    ...

}    

而这个 Transporters中的 connect 方法 也是同样的套路,也是使用 SPI 机制,

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //SPI 机制 
        return getTransporter().connect(url, handler);
    }

默认是 netty

public class NettyTransporter implements Transporter {

    ...

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

总结

我们可以说服务引用就是Dubbo进行 :

  • 消费者注册
  • 封装 invoker 返回给消费者,供消费者透明使用 的过程。

参考资料

  • dubbo 官方文档
原文地址:https://www.cnblogs.com/Benjious/p/12691204.html