Dubbo学习笔记7:Dubbo增强SPI与SPI中扩展点自动包装的实现原理

在Dubbo整体架构分析中介绍了Dubbo中除了Service和Config层为API外,其他各层均为SPI,为SPI意味着下面各层都是组件化可以被替换的,也就是扩展性比较强,这也是Dubbo比较好的一点。

JDK中标准SPI原理

Dubbo增强的SPI功能是从JDK标准SPI演化而来的,所以有必要先讲讲标准SPI的原理。

JDK中的SPI(Service Provider Interface)是面向接口编程的,服务规则提供者会在JRE的核心API里面提供服务访问接口,而具体实现则由其他开发商提供。

例如规范制定者在rt.jar包里面定义了数据库的驱动接口java.sql.Driver。那么MySQL实现的开发商则会在MySQL的驱动包的META-INF文件夹下建立名称为java.sql.Driver的文件,文件内容就是MySQL对java.sql.Driver接口的实现类,如下图:

如下代码可知 com.mysql.jdbc.Driver 就是实现了 java.sql.Driver 接口:

public class com.mysql.jdbc.Driver extends com.mysql.jdbc.NonRegisteringDriver implements java.sql.Driver

上面讲解了如何使用SPI扩展自定义自己的实现,下面来说说SPI实现原理,我们知道Java核心API,比如 rt.jar 包,是使用 Bootstrap ClassLoader 类加载器加载的,而用户提供的Jar包是由Appclassloader加载。并且我们知道如果一个类由类加载器A加载,那么这个类依赖的类也是由相同的类加载器加载。

而用来搜索开发商提供的SPI扩展实现类的API类(ServiceLoader)是使用 Bootstrap ClassLoader加载的,那么ServiceLoader里面依赖的类应该也是由 Bootstrap ClassLoader 来加载,那么ServiceLoader里面依赖的类应该也是由 Bootstrap ClassLoader 来加载。而上面说了用户提供的包含SPI实现类的Jar包是由 Appclassloader加载,所以需要一种违反双亲委派模型的方法,线程上下文类加载器 ContextClassLoader 就是为了解决这个问题。

下面我们写个测试代码,看看具体是如何工作了。

复制代码
public static void main(String[] args){
    // (1)
    ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
    // (2)
    Iterator<Driver> iterator = loader.iterator();
    while(iterator.hasNext()){
        Driver driver = (Driver)iterator.next();
    // (3)
        System.out.println("driver:" + driver.getClass() + ",loader:"+driver.getClass().getClassLoa
der());
    //(4)
    System.out.println("ServiceLoader loader:" + ServiceLoader.class.getClassLoader());
}
复制代码

然后引入MySQL驱动的Jar包,执行结果如下。

driver:class com.mysql.jdbc.Driver , loader:sun.misc.Launcher$AppClassLoader@4554617c
current thread contextloader:sun.misc.Launcher$AppClassLoader@4554617c
ServiceLoader loader:null

从结果可知找到了MySQL的驱动,如果你在引入Oracle驱动的Jar包后再运行,则会输出找到了MySQL和Oracle的驱动,这也说明了,JDK标准的SPI会同时把SPI接口的所有实现类都提前加载好。

另外从执行结果可以知道 ServiceLoader 的加载器 Bootstrap ,因为这里输出了null,并且从该类在 rt.jar 里面,也可以证明。

下面我们来看下 ServiceLoader 的 load 方法源码。

复制代码
public final class ServiceLoader<S> implements Iterable<S>{
    public static <S> ServiceLoader<S> load(Class<S> service){
        // (5) 获取当前线程上下文加载器
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        return ServiceLoader.load(service,cl);
    }

    public static <S> ServiceLoader<S> load(Class<S> service,ClassLoader loader){
        return new ServiceLoader<>(service,loader);
    }

    // (6)
    private ServiceLoader(Class<S> svc,ClassLoader cl){
        service = svc;
        loader = cl;
        reload();
    }
}
复制代码

代码(5)获取了当前线程上下文加载器,这里是AppClassLoader。

代码(6)传递该类加载器到新构造的ServiceLoader的成员变量loader。那么这个loader什么时候使用的呢?下面我们看下LazyIterator的next()方法。

复制代码
public S next(){
    if(acc == null){
        return nextService();
    }else{
        PrivilegedAction<S> action = new PrivilegedAction<S>(){
            public S run(){return nextService();}
        }
        return AccessController.doPrivileged(action,acc);
    }
}
复制代码
复制代码
private S nextService(){
    ...
    try{
        // (7) 使用loader类加载器加载
        c = Class.forName(cn,false,loader);
    }catch(ClassNotFoundException x){
        fail(service,"Provider "+cn+" not found");
    }
    ...
}
复制代码

代码(7)使用loader也就是AppClassLoader加载具体的驱动实现类。至于cn是怎么来的,读者可以参见 LazyIterator 的 hasNext() 方法。

Dubbo增强SPI的实现

Dubbo的扩展点加载机制是基于JDK标准的SPI扩展点发现机制增强而来的,Dubbo解决了JDK标准的SPI的以下问题:

  • JDK标准的SPI会一次性实例化扩展点所有实现,如果由扩展点实现初始化很耗时,但如果没用上也加载,会很浪费资源。
  • 如果扩展点加载失败,就失败了,给用户没有任何通知。比如:JDK标准的ScriptEngine,如果Ruby ScriptEngine因为所依赖的jruby.jar不存在,导致Ruby ScriptEngine类加载失败,这个失败原因被吃掉了,当用户执行ruby脚本时,会报空指针异常,而不是报Ruby ScriptEngine不存在。
  • 增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其他扩展点,也可以对扩展点使用wrapper类进行增强。

本节我们结合服务提供者配置类ServiceConfig来讲解如何使用增强SPI加载扩展接口Protocol实现类,在ServiceConfig类中,有如下代码:

public class ServiceConfig<T> extends AbstractServiceConfig{
    private static final long serialVersionUID = 3033787999037024738L;

    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    ...
}

首先SPI扩展接口Protocol的接口定义如下:

复制代码
@SPI("dubbo")
public interface Protocol{
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException;

    void destroy();        
}
复制代码

 这里ExtensionLoader类似JDK标准SPI里面的ServiceLoader类,代码ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()的作用是获取Protocol接口的适配器类,在Dubbo中每个扩展接口都有一个对应的适配器类,这个适配器类是动态生成的一个类,后面会有讲解如何生成的,这里我们先给出Protocol扩展接口对应的适配器类的代码,如下:

复制代码
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy(){
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort(){
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException{
        if(arg1 == null)
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol);
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0,arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0){
        ...
        // (1)
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        // (2)
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        // (3)
        return extension.export(arg0);    
    }
}
复制代码

所以当我们调用 protocol.export(invoker) 方法的时候实际调用的是动态生成的Protocol$Adaptive 实例的 export(invoker) 方法,其内部代码(1)首先获取参数里面的URL对象,然后从URL对象里面获取用户设置的Protocol的实现类的名称,然后调用代码(2)根据名称获取具体的Protocol协议的实现类(后面我们会知道获取的是实现类被使用Wrapper类增强后的类),最后代码(3)具体调用Protocol协议的实现类 export(invoker) 方法。

这里应该清楚的知道适配器类的存在目的就相当一个分发器,根据不同的参数,委托不同的实现类来做指定的事情,Dubbo实现上是把所有的参数封装到一URL对象里面,包含用户配置的参数,比如设置使用什么协议。另外这里也可以知道Dubbo并没有一次性加载所有扩展接口Protocol的实现类,而是根据URL里面协议类型只加载当前使用的扩展实现类。

下面我们结合时序图来讲解ExtensionLoader的getAdaptiveExtension()方法是如何动态生成扩展接口对应的适配器类,以及getExtension方法如何根据扩展实现类的名称找到对应的实现类的:

  • 时序图步骤(1)获取当前扩展接口对应的ExtensionLoader对象,在Dubbo中每个扩展接口对应着自己的ExtensionLoader对象,如下代码,内部通过并发Map来缓存扩展接口与对应的ExtensionLoader的映射,其中key为扩展接口的Class对象,value为对应的ExtensionLoader实例:
复制代码
public static <T> ExtensionLoader<T> getExtensionLoader(class<T> type){
    if(type == null)
        throw new IllegalArgumentException("Extension type == null");
    if(!type.isInterface())
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    if(!withExtensionAnnotation(type))
        throw new IllegalArgumentException("Extension type(" + type + ") is not extension,because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    ExtensionLoader<T> loader = (ExtensionLoader<T>)EXTENSION_LOADERS.get(type);
    if(loader == null){
        EXTENSION_LOADERS.putIfAbsent(type,new ExtensionLoader<T>(type));
        loader = (Extension<T>)EXTENSION_LOADERS.get(type);
        return loader;
    }  
}    
复制代码
private static final ConcurrentMap<Class<?>,ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>,ExtensionLoader<?>>();

可知第一次访问某个扩展接口时候需要new一个对应的ExtensionLoader放入缓存,后面就直接从内存获取。

  • 步骤(2)获取当前扩展接口对应的适配器对象,内部是首先获取该扩展接口所有实现类的Class对象(注意,这里获取的是Class对象,并不是对象实例)。getAdaptiveExtension的代码如下:
复制代码
@SuppressWarnings("unchecked")
public T getAdaptiveExtension(){
    Object instance = cachedAdaptiveInstance.get();
    if(instance == null){
        if(createAdaptiveInstanceError == null){
            sychronized(cachedAdaptiveInstance){
                instance = cachedAdaptiveInstance.get();
                if(instance == null){
                    try{
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    }catch(Throwable t){
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(),t);    
                    }
                }
            }
        }else{
            throw new IllegalStateException("fail to create adaptive instance:" + createAdaptiveInstanceError.toString(),createAdaptiveInstanceError);
        }
    }
    return (T)instance;
}
复制代码

如上代码使用双重检查创建cachedAdaptiveInstance对象,接口对应的适配器对象就保存到了这个对象里面。

  • 我们重点看步骤(3)createAdaptiveExtension方法,因为具体创建适配器对象的是这个方法。createAdaptiveExtension代码如下:
复制代码
private T createAdaptiveExtension(){
    try{
        return injectExtension((T)getAdaptiveExtensionClass().newInstance());
    }catch(Exception e){
        throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(),e);
    }  
}
复制代码

可知首先调用了步骤(4)getAdaptiveExtensionClass().newInstance()获取适配器对象的一个实例,然后调用步骤(7)injectExtension方法进行扩展点相互依赖注入。下面首先看下步骤(4)getAdaptiveExtensionClass()是如何动态生成适配器类的Class对象的。

复制代码
private Class<?> getAdaptiveExtensionClass(){
    getExtensionClasses();
    if(cachedAdaptiveClass != null){
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
复制代码

如上代码首先调用了步骤(5)getExtensionClasses获取了该扩展接口所有实现类的class对象,然后调用了步骤(6)createAdaptiveExtensionClass创建具体的适配器对象的Class对象,createAdaptiveExtensionClass代码如下:

private Class<?> createAdaptiveExtensionClass(){
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code,classLoader);
}

其中createAdaptiveExtensionClassCode方法是关键,该方法根据扩展接口生成其对应的适配器类的字符串代码,这里是根据protocol接口的代码生成对应的Protocol$Adaptive的字符串代码存放到变量code中,然后默认调用JavassistCompiler的compile(code,classLoader)根据字符串代码生成适配器的Class对象并返回,然后通过getAdaptiveExtensionClass对象并返回,然后通过getAdaptiveExtensionClass().newInstance()创建适配器类的一个对象实例。至此扩展接口的适配器对象已经创建完毕。

  •  下面我们在步骤(7)前面看看步骤(5)getExtensionClasses如何加载扩展接口的所有实现类的Class对象。其内部最终调用了loadExtensionClasses方法进行加载,loadExtensionClasses代码如下:
复制代码
private Map<String,Class<?>> loadExtensionClasses(){
    // 获取扩展接口上SPI注解
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    // 是否存在注解
    if(defaultAnnotation != null){
        String value = defaultAnnotation.value();
        if(value != null && (value = value.trim()).length() > 0){
            String[] names = NAME_SEPARATOR.split(value);
            if(name.length > 1){
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ":" + Arrays.toString(names));
            }
            // 默认实现类的名称放到cachedDefaultName
            if(names.length == 1) cachedDefaultName = names[0];
        }
    }
    // 在指定目录的jar里面查找扩展点
    Map<String,Class<?>> extensionClasses = new HashMap<String,Class<?>>();
    loadFile(extensionClasses,DUBBO_INTERNAL_DIRECTORY);    // META/dubbo/internal/
    loadFile(extensionClasses,DUBBO_DIRECTORY);     // META-INF/dubbo/
    loadFile(extensionClasses,SERVICES_DIRECTORY);    // META-INF/services/
    return extensionClasses;
}
复制代码

如上代码,拿Protocol协议来说,这里SPI注解为 @SPI("dubbo") ,那么这里 cachedDefaultName 就是Dubbo。然后 META-INF/dubbo/internal/ 、 META-INF/dubbo 、 META-INF/services/ 目录下去加载具体的扩展实体类,比如Protocol协议默认实现类如下:

  • 步骤(7)injectExtension方法进行扩展点实现类相互依赖自动注入:
复制代码
private T injectExtension(T instance){
    try{
        if(objectFactory != null){
            // 遍历扩展点实现类所有的方法
            for(Method method : instance.getClass().getMethods()){
                // 当前方法public的set方法,并且只有一个入参
                if(method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())){
                    // 获取参数类型
                    Class<?> pt = method.getParameterTypes()[0];
                    try{
                        String property = method.getName().length() > 3 ? method.getName().substring(3,4).toLowerCase + method.getName().substring(4) : "";
                        // 如果是则反射调用set方法
                        if(object != null){
                            method.invoke(instance,object);
                        }
                    }catch(Exception e){
                        logger.error("fail to inject via method " + method.getName() +  " of  interface " + type.getName() + ":" + e.getMessage() , e);
                    }
                }
            }
        }  
    }catch(Exception e){
        logger.error(e.getMessage() , e);
    }
    return instance;
}
复制代码

Dubbo增强SPI中扩展点自动包装的实现原理

在Spring AOP中我们可以使用多个切面对指定类的方法进行增强,在Dubbo中也提供了类似的功能,在Dubbo中你可以指定多个Wrapper类对指定的扩展点的实现类的方法进行增强。

上节讲下面的调用代码时实际调用的是适配器 Protocol$Adaptive 的 export 方法,如果URL对象里面的 protocol 为 dubbo,那么在没有扩展点自动包装时,protocol.export返回的就是DubboProtocol的对象。

Exporter<?> exporter = protocol.export(wrapperInvoker);

而真正情况下Dubbo里面使用了ProtocolFilterWrapper、ProtocolListenerWrapper等Wrapper类对DubboProtocol对象进行了包装增强。

ProtocolFilterWrapper、ProtocolListenerWrapper、DubboProtocol三个类都有一个拷贝构造函数,这个拷贝构造函数的参数就是扩展接口Protocol,所谓包装是指下面意思:

复制代码
public class XxxProtocolWrapper implements Protocol{
    private Protocol impl;
    public XxxProtocolWrapper(Protocol protocol){
        impl = protocol;
    }
    public void export(){
        // ...在调用DubboProtocol的export前做些事情
        impl.export();
        // ...在调用DubboProtocol的export后做些事情
    }
    ...
}
复制代码

比如这里会进行两次包装,第一次可能首先使用ProtocolListenerWrapper类对DubboProtocol进行包装,这时候ProtocolListenerWrapper类里面的impl就是DubboProtocol,然后第二次使用 ProtocolFilterWrapper 对 ProtocolListenerWrapper进行包装,也就是ProtocolFilterWrapper里面的impl是ProtocolListenerWrapper,那么这时候调用适配器Protocol$Adaptive的export方法,如果URL对象里面的protocol为dubbo,那么在没有扩展点自动包装时候,这时候 protocol.export 返回的就是 ProtocolFilterWrapper的实例了。

下面我们看下Dubbo增强的SPI中如何去收集的这些包装类,以及如何实现的使用包装类对SPI实现类的自动包装。

借用上节的时序图,其实代码(5)getExtensionClasses 里面的 loadFile 方法除了加载扩展接口的所有实现类的Class对象外还对包装类(wrapper类)进行了收集,具体代码如下:

复制代码
private void loadFile(Map<String,Class<?>> extensionClasses,String dir){
    String fileName = dir + type.getName();
    try{
        ...
        if(urls != null){
            while(urls.hasMoreElements()){
                java.net.URL url = urls.nextElement();
                try{
                    BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(),"utf-8"));
                    try{
                        String line = null;
                        while((line = reader.readLine()) != null){
                            ...
                            if(line.length() > 0){
                                try{
                                    ...
                                    if(line.length() > 0){
                                        ...
                                    }else{
                                        // (I) 这里判断SPI实现类是否有扩展接口为参数的拷贝构造函数
                                        try{
                                            clazz.getConstructor(type);
                                            Set<class<?>> wrappers = cachedWrapperClasses;
                                            if(wrappers == null){
                                                cachedWrapperClasses = new ConcurrentHashset<Class<?>>();
                                                wrappers = cachedWrapperClasses;
                                            }
                                            wrappers.add(clazz);
                                        }catch(NoSuchMethodException e){
                                            ...
                                        }
                                    }
                                }catch(Throwable t){
                                    ...
                                }
                            } 
                        }    // end of while read lines
                    } finally {
                        reader.close();
                    }
                }catch(){
                    ...
                }
            } // end of while urls    
        }    
    }catch(Throwable t){
        ...
    }
}
复制代码

如上代码(I)处调用 clazz.getConstructor(type),这里是判断SPI实现类clazz是否有扩展接口 type 为参数的拷贝构造函数,如果没有直接抛异常 NoSuchMethodException,该异常被catch掉了,如果有则说明clazz类为wrapper类,则收集起来放入到 cachedWrapperClasses集合,到这里wrapper类的收集已经完毕。

而具体对扩展实现类使用收集的wrapper类进行自动包装是在 createExtension 方法里做的:

复制代码
private T createExtension(String name){
    ...
    try{
        // cachedWrapperClasses里面有元素,即为wrapper类
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if(wrapperClasses != null && wrapperClasses.size() > 0){
            // 使用循环一层层对包装类进行包装,可以参考上面讲解的使用 ProtocolFilterWrapper / ProtocolListenerWrapper对DubboProtocol进行包装的流程
            for(Class<?> wrapperClass : wrapperClasses){
                instance = injectExtension((T)wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    }catch(Throwable t){
        ...
    }
}
复制代码
原文地址:https://www.cnblogs.com/cnndevelop/p/12186955.html