Dubbo学习笔记9:Dubbo服务提供方启动流程源码分析

首先我们通过一个时序图,直观看下Dubbo服务提供方启动的流程:

  • 在《Dubbo整体框架分析》一文中我们提到,服务提供方需要使用ServiceConfig API发布服务,具体是调用代码(1)export()方法来激活发布服务。export的核心代码如下:
public synchronized void export(){
    ...
    // 这里是延迟发布
    if(delay != null && delay > 0){
        delayExportExecutor.schedule(new Runnable(){
            public void run(){
                doExport();
            }
        } , delay , TimeUnit.MILLISECONDS);
    }else{
        // 直接发布
        doExport();
    }
}
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter" , true));

如上代码,可知Dubbo的延迟发布通过使用ScheduledExecutorService来实现的,可以通过调用ServiceConfig的setDelay(Integer delay)来设置延迟发布时间。如果没有设置延迟时间则直接调用代码(2)doExport()方法发布服务,延迟发布最后也是调用的该方法。

  • 代码(2)主要是根据ServiceConfig里面的属性进行合法性检查,这里我们主要看其内部最后调用的doExportUrls方法。
  • 代码(3)内部首先通过调用loadRegistries方法加载所有的服务注册中心对象,Dubbo中一个服务可以被注册到多个服务注册中心。
  • 另外Dubbo支持本地调用,本地调用使用了injvm协议,是一个伪协议,它不开启端口,不发起远程调用,旨在jvm内直接关联,但执行Dubbo的Filter链,所以默认下Dubbo同时支持本地调用与远程调用协议,所以这里的循环用来暴露本地服务和远程服务。
  • 代码(5)的 doExportUrlsFor1Protocol 方法内部首先把参数封装在url,在Dubbo里面会把所有参数封装在一个url里面,然后具体执行服务到处,其核心代码如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref , (Class)interfaceClass , url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker , this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);

其中proxyFactory和protocol的定义如下:

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdativeExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

可知它们是扩展接口的适配器类。

  • 执行代码 proxyFactory.getInvoker(ref , (Class)interfaceClass , url)时,我们发现实际是首先执行代码(6)调用了扩展接口 ProxyFactory的适配器类,然后适配器内部根据url里proxy的类型选择具体的代理工厂,这里默认proxy类型为javasist,所以又调用了代码(7)执行了JavassistProxyFactory的getInvoker方法获取了代理类。

JavassistProxyFactory的getInvoker代码如下:

public <T> Invoker<T> getInvoker(T proxy , Class<T> type , URL url){
    // 
    final Wrapper c = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy , type , url){
        @Override
        protected Object doInvoke(T proxy , String methodName , Class<?>[] parameterTypes , Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy,methodName,parameterTypes,arguments);
        }
    };    
}

这里首先把服务实现类转换为Wrapper类,是为了减少反射的调用。代码(6)具体返回的是AbstractProxyInvoker对象,其内部重写doInvoke方法,具体委托给Wrapper实现具体功能。到这里完成了《Dubbo具体架构分析》一文中讲解的服务提供方实现类到Invoker的转换。

  • 然后执行代码(8),当执行protocol.export(wrapperInvoker)的时候,实际调用了Protocol的适配器类Protocol$Adaptive的export方法。其内部根据url中Protocol的类型为registry,会选择Protocol的实现类RegistryProtocol。但由于Dubbo SPI的扩展点使用wrapper自动增强的存在,这里使用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper对其进行了增强,所以需要一层层调用才会具体调用到RegistryProtocol的export方法。
  • 代码(12)RegistryProtocol中的export方法通过代码(13)doLocalExport启动了NettyServer进行监听服务,代码(14)则将当前服务注册到具体的服务注册中心。到这里完成《Dubbo整体架构分析》一文中谈到的Invoker到Exporter的转换。

下面我们再来看doLocalExport是如何启动NettyServer的,doLocalExport内部主要调用DubboProtocol的export方法,下面看下时序图:

由于DubboProtocol也被Wrapper类增强了,所以也是一层层调用后,执行代码(7)调用DubboProtocol的export方法,export代码如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException{
    URL url = invoker.getUrl();
    // invoker到exporter转换
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker , key , exporterMap);
    exporterMap.put(key,exporter);
    ...
    openServer(url);
    return exporter;
}

这里将invoker转换到DubboExporter对象,然后执行代码(9),接着一步步最后会执行到代码(14)NettyTransporter的bind方法,其代码如下:

public Server bind(URL url,ChannelHandler listener) throws RemotingException{
    return new NettyServer(url,listener);    
}

有上面代码可知,其内部创建了NettyServer,而NettyServer的构造函数内部又调用了NettyServer的doOpen方法启动了服务监听。

public NettyServer(URL url,ChannelHandler handler) throws RemotingException{
    super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME)));
}

这里我们主要看 ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME))这个代码,改代码里面加载了具体的线程模型,通过ChannelHandlers的wrapInternal方法完成了加载:

protected ChannelHandler wrapInternal(ChannelHandler handler,URL url){
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler,url)));
}

可知根据url里面的线程模型选择具体的Dispatcher实现类,这里再重新提下Dubbo提供的Dispatcher实现类如下,默认是all:

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

到这里线程模型加载地方讲解完了,还有一个点就是线程模型中线程池SPI扩展什么时候加载的呢?这里以线程模型AllDispatcher为例,它对应的是AllChannelHandler,其构造函数如下:

public AllChannelHandler(ChannelHandler handler,URL url){
    super(handler , url);    // 父类WrappedChannelHandler构造方法      
}
public WrappedChannelHandler(ChannelHandler handler,URL url){
    ...
    executor = (ExecutorService)ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    ...
}

可知WrapperChannelHandler的构造函数里根据url获取了具体的扩展接口ThreadPool的SPI实现类,这里再提下ThreadPool的扩展接口如下,默认是fixed:

fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
原文地址:https://www.cnblogs.com/xhj123/p/9118930.html