dubbo提供者源码分析

dubbo提供者源码分析

扫描包下的带有@DubboService的类

@DubboComponentScan源码分析-->注入ServiceAnnotationBeanPostProcessor对象

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
      String[] basePackages() default {};  
}

根据@Import的特性,会调用DubboComponentScanRegistrar.registerBeanDefinitions方法

public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
     @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        //获取扫描目录的集合
        Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
		//把ServiceAnnotationBeanPostProcessor注入到容器里
        registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);

        registerCommonBeans(registry);
    }
    
    //注册ServiceAnnotationBeanPostProcessor对象
    private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
        builder.addConstructorArgValue(packagesToScan);
        builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
        AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
        BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);

    }

    
    //通过注解的属性得到扫描目录
    private Set<String> getPackagesToScan(AnnotationMetadata metadata) {
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(DubboComponentScan.class.getName()));
        String[] basePackages = attributes.getStringArray("basePackages");
        Class<?>[] basePackageClasses = attributes.getClassArray("basePackageClasses");
        String[] value = attributes.getStringArray("value");
        // Appends value array attributes
        Set<String> packagesToScan = new LinkedHashSet<String>(Arrays.asList(value));
        packagesToScan.addAll(Arrays.asList(basePackages));
        for (Class<?> basePackageClass : basePackageClasses) {
            packagesToScan.add(ClassUtils.getPackageName(basePackageClass));
        }
        if (packagesToScan.isEmpty()) {
            return Collections.singleton(ClassUtils.getPackageName(metadata.getClassName()));
        }
        return packagesToScan;
    }
}

​ 功能介绍:

  • 获取@DubboComponentScan注解上的扫描目录的集合
  • 新建一个ServiceAnnotationBeanPostProcessor对象,给这对象设置上packagesToScan扫描目录的集合属性, 并且把对象注入到容器里面。

ServiceAnnotationBeanPostProcessor源码分析

​ 因为ServiceAnnotationBeanPostProcessor父类是ServiceClassPostProcessor,并且ServiceClassPostProcessor实现了BeanDefinitionRegistryPostProcessor接口所以会调用postProcessBeanDefinitionRegistry动态注册对象到容器里

public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
        ResourceLoaderAware, BeanClassLoaderAware {
            
 private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
            DubboService.class,
            Service.class,
            com.alibaba.dubbo.config.annotation.Service.class
 );
        
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        // @since 2.7.5
       //把DubboBootstrapApplicationListener注入到容器理,名字为 dubboBootstrapApplicationListener
        registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);

        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

        if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
            //注册扫描目录下的bean
            registerServiceBeans(resolvedPackagesToScan, registry);
        } else {
            if (logger.isWarnEnabled()) {
                logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
            }
        }
    }
    
            
    //注册扫描目录下的bean
   private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);

        // refactor @since 2.7.7
        serviceAnnotationTypes.forEach(annotationType -> {
            //加入 DubboService.class,Service.class,com.alibaba.dubbo.config.annotation.Service.class这些注解
            scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
        });

        for (String packageToScan : packagesToScan) {

            // Registers @Service Bean first
            scanner.scan(packageToScan);

            // 扫描目录下带有@DubboService,@Service,@com.alibaba.dubbo.config.annotation.Service注解的类
            Set<BeanDefinitionHolder> beanDefinitionHolders =
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                    //遍历注入
                    registerServiceBean(beanDefinitionHolder, registry, scanner);
                }

    			...

            } 
           ...
        }

    }
            
}


 private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
                                     DubboClassPathBeanDefinitionScanner scanner) {
    	 //类名
        Class<?> beanClass = resolveClass(beanDefinitionHolder);
     	//注解
        Annotation service = findServiceAnnotation(beanClass);
     	//获取注解上的属性
        AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
	
        Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);

        String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();

        AbstractBeanDefinition serviceBeanDefinition =
                buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);

        // ServiceBean Bean name
        String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);

        if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
            registry.registerBeanDefinition(beanName, serviceBeanDefinition);
			...
        } else {
  			...
        }

    }


	//解析配置的参数并把参数配置到ServiceBean上面,并且注入ServiceBean到容器上
    private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
                                                              AnnotationAttributes serviceAnnotationAttributes,
                                                              Class<?> interfaceClass,
                                                              String annotatedServiceBeanName) {

        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);

        AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();

        MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();

        String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol",
                "interface", "interfaceName", "parameters");
        propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));
        // References "ref" property to annotated-@Service Bean
        addPropertyReference(builder, "ref", annotatedServiceBeanName);
        // Set interface
        builder.addPropertyValue("interface", interfaceClass.getName());
        // Convert parameters into map
        builder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters")));
        
        
        //这里serviceAnnotationAttributes是@DubboService或者@Service注解里面的属性,这里的意思就是,把这些属性获取然后,给builder设置上。
        //serviceAnnotationAttributes.get代表是获取注解属性中的某个值
          
        // Add methods parameters
        List<MethodConfig> methodConfigs = convertMethodConfigs(serviceAnnotationAttributes.get("methods"));
        if (!methodConfigs.isEmpty()) {
            builder.addPropertyValue("methods", methodConfigs);
        }
        /**
         * Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference
         */
        String providerConfigBeanName = serviceAnnotationAttributes.getString("provider");
        if (StringUtils.hasText(providerConfigBeanName)) {
            addPropertyReference(builder, "provider", providerConfigBeanName);
        }

        /**
         * Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference
         */
        String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor");
        if (StringUtils.hasText(monitorConfigBeanName)) {
            addPropertyReference(builder, "monitor", monitorConfigBeanName);
        }

        /**
         * Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference
         */
        String applicationConfigBeanName = serviceAnnotationAttributes.getString("application");
        if (StringUtils.hasText(applicationConfigBeanName)) {
            addPropertyReference(builder, "application", applicationConfigBeanName);
        }
        //Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference
		//Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference
		//Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference
        

        return builder.getBeanDefinition();

    }
}

public abstract class BeanRegistrar {
		....

    public static boolean registerInfrastructureBean(BeanDefinitionRegistry beanDefinitionRegistry, String beanName, Class<?> beanType) {
        boolean registered = false;
        if (!beanDefinitionRegistry.containsBeanDefinition(beanName)) {
            RootBeanDefinition beanDefinition = new RootBeanDefinition(beanType);
            beanDefinition.setRole(2);
            beanDefinitionRegistry.registerBeanDefinition(beanName, beanDefinition);
            registered = true;
            if (log.isInfoEnabled()) {
                log.info("The Infrastructure bean definition [" + beanDefinition + "with name [" + beanName + "] has been registered.");
            }
        }

        return registered;
    }
 		....
}

ServiceClassPostProcessor.postProcessBeanDefinitionRegistry执行步骤如下:

  • registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class):把DubboBootstrapApplicationListener注入到容器中
  • registerServiceBeans 注入扫描目录下的符合条件的bean

registerServiceBeans 分析:

  • scanner.addIncludeFilter: 加入条件@DubboService,@Service,@com.alibaba.dubbo.config.annotation.Service
  • findServiceBeanDefinitionHolders:扫描目录下带有@DubboService,@Service,@com.alibaba.dubbo.config.annotation.Service注解的类
  • registerServiceBean:遍历注入对象

registerServiceBean:

  • 获取类的相应的注解,类名,注解上的属性
  • buildServiceBeanDefinitionbuilder = rootBeanDefinition(ServiceBean.class)得到ServiceBean类的BeanDefinitionBuilder,并且给builder赋上一些参数:比如:interface,parameters,provider,application,registry,protocol。这些都是从@DubboService注解上面得到的或者本身这个类的信息,最后:通过builder.getBeanDefinition()创建一个BeanDefinition返回
  • registry.registerBeanDefinition(beanName, serviceBeanDefinition):把得到的BeanDefinition注入到容器里面

ServiceClassPostProcessor.postProcessBeanDefinitionRegistry总体做的事情:

  • DubboBootstrapApplicationListener注入到容器中
  • 扫描目录下带有@DubboService,@Service,@com.alibaba.dubbo.config.annotation.Service注解的类以ServiceBean作为对象注入到容器里i面

因为我们最后是把ServiceBean对象注入到容器里面,我们看下ServiceBean类的构建方法还有初始化方法:

ServiceBean初始化

image-20201104234320246

从上图可以知道ServiceBean的祖先图。我们看一下AbstractConfig这个类初始化有这么一个方法

public abstract class AbstractConfig implements Serializable {
 
	@PostConstruct
    public void addIntoConfigManager() {
        ApplicationModel.getConfigManager().addConfig(this);
    }
}
public class ConfigManager extends LifecycleAdapter implements FrameworkExt {
	    final Map<String, Map<String, AbstractConfig>> configsCache = newMap();

}

这个方法会把AbstractConfig对象加入到ConfigManager.configsCache里面。那有什么AbstractConfig加入到里面呢?

image-20201104233112180

​ 大概有上面这么多加入到AbstractConfig中,configsCache的key是key值,value为AbstractConfig,我们之前的servicerBean的key就是”service“,那其他的在哪里新建的呢?

​ 众所周知,我们在配置文件里面会配置比如以下的配置:

dubbo:
  application:
    name: spring-boot-dubbo-provider
  protocol:
    name: dubbo
    port: -1
  registry:
    address:  nacos://127.0.0.1:8848
  scan:
    base-packages: com.onion.service
@ConfigurationProperties("dubbo")
public class DubboConfigurationProperties {
    @NestedConfigurationProperty
    private DubboConfigurationProperties.Config config = new DubboConfigurationProperties.Config();
    @NestedConfigurationProperty
    private DubboConfigurationProperties.Scan scan = new DubboConfigurationProperties.Scan();
    @NestedConfigurationProperty
    private ApplicationConfig application = new ApplicationConfig();
    @NestedConfigurationProperty
    private ModuleConfig module = new ModuleConfig();
    @NestedConfigurationProperty
    private RegistryConfig registry = new RegistryConfig();
    @NestedConfigurationProperty
    private ProtocolConfig protocol = new ProtocolConfig();
    @NestedConfigurationProperty
    private MonitorConfig monitor = new MonitorConfig();
    @NestedConfigurationProperty
    private ProviderConfig provider = new ProviderConfig();
    @NestedConfigurationProperty
    private ConsumerConfig consumer = new ConsumerConfig();
    @NestedConfigurationProperty
    private ConfigCenterBean configCenter = new ConfigCenterBean();
    @NestedConfigurationProperty
    private MetadataReportConfig metadataReport = new MetadataReportConfig();
    private Map<String, ModuleConfig> modules = new LinkedHashMap();
    private Map<String, RegistryConfig> registries = new LinkedHashMap();
    private Map<String, ProtocolConfig> protocols = new LinkedHashMap();
    private Map<String, MonitorConfig> monitors = new LinkedHashMap();
    private Map<String, ProviderConfig> providers = new LinkedHashMap();
    private Map<String, ConsumerConfig> consumers = new LinkedHashMap();
    private Map<String, ConfigCenterBean> configCenters = new LinkedHashMap();
    private Map<String, MetadataReportConfig> metadataReports = new LinkedHashMap();
   }

​ 根据spirng读取配置文件的方式,我找到以下这个类DubboConfigurationProperties去读取配置文件,然后把信息存到这个对象上的:我们可以看到RegistryConfig,ProtocolConfig,ProviderConfig等等相关对象的创建。因为这些对象都是AbstractConfig的子类,所以最后的@PostConstruct初始化的时候都会加入到ConfigManager中

插眼,这个跟后面怎么获取ConfigManager有关。这里就说明了ConfigManager的来历了

DubboBootstrapApplicationListener解析

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener implements Ordered {
    public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
    private final DubboBootstrap dubboBootstrap = DubboBootstrap.getInstance();

    public DubboBootstrapApplicationListener() {
    }

    public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            this.onContextRefreshedEvent((ContextRefreshedEvent)event);
        } else if (event instanceof ContextClosedEvent) {
            this.onContextClosedEvent((ContextClosedEvent)event);
        }

    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        this.dubboBootstrap.start();
    }

	...
}

​ 为什么突然间就扯到了DubboBootstrapApplicationListener他是什么时候注入容器的。在ServiceAnnotationBeanPostProcessor解析位置,不知道的可以到上面看看

​ 由于DubboBootstrapApplicationListener是一个监听者,所以触发onApplicationContextEvent方法进行监听,当ContextRefreshedEvent是:容器refresh的时候,会触发onContextRefreshedEvent()方法去处理,最终会调用this.dubboBootstrap.start();

dubboBootstrap.start()分析

public class DubboBootstrap extends GenericEventListener {

   	public DubboBootstrap start() {
        //先判断是否已经初始化过
        if (this.started.compareAndSet(false, true)) {
            this.ready.set(false);
            this.initialize(); //初始化
            if (this.logger.isInfoEnabled()) {
                this.logger.info(NAME + " is starting...");
            }

            this.exportServices(); //服务暴露
            if (!this.isOnlyRegisterProvider() || this.hasExportedServices()) {
                this.exportMetadataService();
                this.registerServiceInstance();
            }

            this.referServices();   //提供服务
            if (this.asyncExportingFutures.size() > 0) {
                (new Thread(() -> {
                    try {
                        this.awaitFinish();
                    } catch (Exception var2) {
                        this.logger.warn(NAME + " exportAsync occurred an exception.");
                    }

                    this.ready.set(true);
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info(NAME + " is ready.");
                    }

                })).start();
            } else {
                this.ready.set(true);
                if (this.logger.isInfoEnabled()) {
                    this.logger.info(NAME + " is ready.");
                }
            }

            if (this.logger.isInfoEnabled()) {
                this.logger.info(NAME + " has started.");
            }
        }

        return this;
    }

}

DubboBootstrap.start()有以下一些操作:

  • this.initialize()
  • this.exportServices(); //服务暴露
  • this.exportMetadataService();
  • this.registerServiceInstance();
  • this.referServices();

DubboBootstrap.exportServices 服务暴露分析

public class DubboBootstrap extends GenericEventListener {
    
    private void exportServices() {
        	//this.configManager.getServices()就是ServiceBean构建的时候会,ApplicationModel.getConfigManager().addConfig(this);
            this.configManager.getServices().forEach((sc) -> {
                ServiceConfig serviceConfig = (ServiceConfig)sc;
                serviceConfig.setBootstrap(this);
                if (this.exportAsync) {
                    ExecutorService executor = this.executorRepository.getServiceExporterExecutor();
                    Future<?> future = executor.submit(() -> {
                        sc.export();
                        this.exportedServices.add(sc);
                    });
                    this.asyncExportingFutures.add(future);
                } else {
                    sc.export();
                    this.exportedServices.add(sc);
                }

            });
      }
}

ServiceConfig.export

    public class ServiceConfig<T> extends ServiceConfigBase<T> {

        public synchronized void export() {
            if (this.shouldExport()) {
                if (this.bootstrap == null) {
                    this.bootstrap = DubboBootstrap.getInstance();
                    this.bootstrap.initialize();
                }

                this.checkAndUpdateSubConfigs();
                this.serviceMetadata.setVersion(this.getVersion());
                this.serviceMetadata.setGroup(this.getGroup());
                this.serviceMetadata.setDefaultGroup(this.getGroup());
                this.serviceMetadata.setServiceType(this.getInterfaceClass());
                this.serviceMetadata.setServiceInterfaceName(this.getInterface());
                this.serviceMetadata.setTarget(this.getRef());
                if (this.shouldDelay()) {
                    DELAY_EXPORT_EXECUTOR.schedule(this::doExport, (long)this.getDelay(), TimeUnit.MILLISECONDS);
                } else {
                    this.doExport();
                }

                this.exported();
            }
        }
        
      protected synchronized void doExport() {
            if (this.unexported) {
                throw new IllegalStateException("The service " + this.interfaceClass.getName() + " has already unexported!");
            } else if (!this.exported) {
                this.exported = true;
                if (StringUtils.isEmpty(this.path)) {
                    this.path = this.interfaceName;
                }

                this.doExportUrls();
            }
    	}
     
        
        private void doExportUrls() {
            //ServiceRepository从名字可以知道是一个保存文件的。这里的作用就是把我们这个serviceBean相关的信息存到ServiceRepository里面
            ServiceRepository repository = ApplicationModel.getServiceRepository();
            ServiceDescriptor serviceDescriptor = repository.registerService(this.getInterfaceClass());
            repository.registerProvider(this.getUniqueServiceName(), this.ref, serviceDescriptor, this, this.serviceMetadata);
            //获取注册中心的地址
            List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
            Iterator var4 = this.protocols.iterator();

            while(var4.hasNext()) {
                ProtocolConfig protocolConfig = (ProtocolConfig)var4.next();
                String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
                    return p + "/" + this.path;
                }).orElse(this.path), this.group, this.version);
                repository.registerService(pathKey, this.interfaceClass);
                this.serviceMetadata.setServiceKey(pathKey);
                this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }

   		 }

   
    
    
    }

ServiceRepository代码显示:

public class ServiceRepository extends LifecycleAdapter implements FrameworkExt {

    public static final String NAME = "repository";

    // services
    private ConcurrentMap<String, ServiceDescriptor> services = new ConcurrentHashMap<>();

    // consumers
    private ConcurrentMap<String, ConsumerModel> consumers = new ConcurrentHashMap<>();

    // providers
    private ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>();



    public void registerProvider(String serviceKey,
                                 Object serviceInstance,
                                 ServiceDescriptor serviceModel,
                                 ServiceConfigBase<?> serviceConfig,
                                 ServiceMetadata serviceMetadata) {
        ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, serviceConfig,
                serviceMetadata);
        //把提供者的相关信息放到providers里面
        providers.putIfAbsent(serviceKey, providerModel);
        providersWithoutGroup.putIfAbsent(keyWithoutGroup(serviceKey), providerModel);
    }
    
    public ServiceDescriptor registerService(Class<?> interfaceClazz) {
        return services.computeIfAbsent(interfaceClazz.getName(),
                _k -> new ServiceDescriptor(interfaceClazz));
    }
}
  • this.configManager.getServices():因为带有@DubboService,@Service,@com.alibaba.dubbo.config.annotation.Service注解的类而生成的ServiceBean, 加入到this.configManager是在:因为AbstractConfig的addIntoConfigManager的方法带有@PostConstruct所以会被触发,把serviceBean加入到this.configManager中。位置可以看到serviceBean初始化的部分
  • sc.export() : 给serviceMetadata(服务元数据)设置相应的值,然后调用doExport方法
  • doExport->doExportUrls:
    • repository.registerService(this.getInterfaceClass()) :构造提供者的ServiceDescriptor 到ServiceRepository.services
    • repository.registerProvider: 构造ProviderModel,然后保存到ServiceRepository.providers。
    • ConfigValidationUtils.loadRegistries(this, true): 获取注册中心的地址集合
    • this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);: 暴露服务

loadRegistries获取注册中心

    public class ConfigValidationUtils {

        public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
            // check && override if necessary
            List<URL> registryList = new ArrayList<URL>();
            ApplicationConfig application = interfaceConfig.getApplication();
            List<RegistryConfig> registries = interfaceConfig.getRegistries();
            if (CollectionUtils.isNotEmpty(registries)) {
                //遍历所有的注册的配置<registry>
                for (RegistryConfig config : registries) {
                    String address = config.getAddress();
                    if (StringUtils.isEmpty(address)) {
                        address = ANYHOST_VALUE;
                    }
                    if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                        Map<String, String> map = new HashMap<String, String>();
                        AbstractConfig.appendParameters(map, application);
                        AbstractConfig.appendParameters(map, config);
                        map.put(PATH_KEY, RegistryService.class.getName());
                        AbstractInterfaceConfig.appendRuntimeParameters(map);
                        if (!map.containsKey(PROTOCOL_KEY)) {
                            map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                        }
                        //把配置上的address转换成URL
                        List<URL> urls = UrlUtils.parseURLs(address, map);

                        for (URL url : urls) {

                            //REGISTRY_KEY = "registry";,
                            //1.给url加上参数registry=协议名字
                            //2.给url设置上协议的信息。 
                            //如果参数是registry-type=service -> service-discovery-registry 
                            //否则: ->   registry
                            url = URLBuilder.from(url)
                                    .addParameter(REGISTRY_KEY, url.getProtocol())
                                    .setProtocol(extractRegistryType(url))
                                    .build();
                            if ((provider && url.getParameter(REGISTER_KEY, true))
                                    || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                                registryList.add(url);
                            }
                        }
                    }
                }
            }
            return registryList;
        }
        
        private static String extractRegistryType(URL url) {
            //SERVICE_REGISTRY_TYPE : "service" ; 
            //SERVICE_REGISTRY_PROTOCOL : "service-discovery-registry";
        	return isServiceDiscoveryRegistryType(url) ? SERVICE_REGISTRY_PROTOCOL : REGISTRY_PROTOCOL;
   		}
            public static boolean isServiceDiscoveryRegistryType(URL url) {
        return isServiceDiscoveryRegistryType(url == null ? emptyMap() : url.getParameters());
    }
      public static boolean isServiceDiscoveryRegistryType(URL url) {
        return isServiceDiscoveryRegistryType(url == null ? emptyMap() : url.getParameters());
    	}
   }
  • 遍历所有的注册中心的配置,获取相应的信息

  • 的属性adress转换成URLS

  • URL的处理URLBuilder.from(url).addParameter(REGISTRY_KEY, url.getProtocol()).setProtocol(extractRegistryType(url)).build();

    • 给url加上参数registry=协议名字

      比如url是:nacos://127.0.0.1:8848 加上参数后:nacos://127.0.0.1:8848?registry=nacos

    • 给url设置上协议的信息

      如果参数是registry-type=service -> service-discovery-registry ,否则: -> registry

      比如原本是:nacos://127.0.0.1:8848?registry=nacos,最后出来的url就是 registry://127.0.0.1:8848?registry=nacos

​ 这个url的处理还是很重要的,因为后面代码会根据协议的名字去获取相应的实现类,如果不知道他这里已经把协议转换了。你就会直接走错流程了

doExportUrlsFor1Protocol暴露服务

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);

        //读取相应的参数,然后放到map中
        //比如<<dubbo:application name="spring-boot-dubbo-provider"/>
        //map中就有:key是application value是spring-boot-dubbo-provider
        ServiceConfig.appendRuntimeParameters(map);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        //读取@DubboService中@Method配置
        if (CollectionUtils.isNotEmpty(getMethods())) {
            for (MethodConfig method : getMethods()) {
  				....遍历注解上的@Method...给map赋值
                //读取@Method中的@Argument的配置
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
	  				....遍历注解上的@Argument...给map赋值
                    }
                }
            } // end of methods for
        }

		...一些参数插入map操作...
        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);

        // export service
        //读取主机ip
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        //读取端口
        Integer port = findConfigedPorts(protocolConfig, name, map);
        //通过各个参数构建成要暴露的url
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter("scope");
        // don't export when none is configured 
        if (!"none".equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!"remote".equalsIgnoreCase(scope)) {
                exportLocal(url);  //本地发布服务
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!"local".equalsIgnoreCase(scope)) {
                //注册地址有没有空应该是分两种情况,一种是有注册中心,一种是没有注册中心的
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if ("injvm".equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        ...参数是否需要设置proxy和monitor操作...

                        //构建一个invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
 					...不带注册中心的分析,其实跟带注册中心的没什么区别...
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);
    }

构建一个map:

  • AbstractConfig.appendParameters:读取配置文件里面dubbo的配置和@DubboService的配置放到map
  • 读取@DubboService里面的@Method配置
  • 读取@Method中的@Argument的配置

最后得到的结果map如下:

image-20201107112845978

  • findConfigedHosts: 读取主机ip

  • findConfigedPorts: 读取主机端口号

  • 根据map主机ip和端口号构建一个url,url大概就是如下这样:

    dubbo://192.168.30.1:20880/com.onion.service.IUserService?...参数

  • exportLocal(url): 本地发布服务。

  • 发布远程服务:分两种情况,1: 带注册中心的 2: 不带注册中心的。 这里只分析带注册中心的

    • 构建invoker:PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));

      ​ ref: 实现类,interfaceclass:接口类 ,处理一下url,给url加上参数export

      最后大概生成的地址如下:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos

    • 包装一下invoker,然后发布PROTOCOL.export(wrapperInvoker)

exportLocal发布本地服务

public class ServiceConfig<T> extends ServiceConfigBase<T> {
    
    private void exportLocal(URL url) {
            URL local = URLBuilder.from(url)
                    .setProtocol(LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            Exporter<?> exporter = PROTOCOL.export(
                    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
        }
}
public class InjvmProtocol extends AbstractProtocol implements Protocol {   
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
}
  • 通过传进来的url,新建的url设置为如下信息protocol:injvm,host:127.0.0.1,端口:0
  • 跟之前一样PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local))构建以一个invoker
    • 因为PROTOCOL是根据自适应扩展点构建的,所以会走Protocol$Adaptive这个类,这个类会根据协议的名字,最后会调用InjvmProtocol
    • InjvmProtocol.export: 新建一个InjvmExporter返回
  • 把返回的export加入到exporters中

RegistryProtocol.export发布远程服务

​ 为什么会使用Protocol$Adaptive去会根根据相应的协议去调用,我们传进来的url是:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos,所以最后会调用RegistryProtocol.export

public class RegistryProtocol implements Protocol {
	@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //获取注册中心的地址
        URL registryUrl = getRegistryUrl(originInvoker);  //nacos://127.0.0.1:8848
        // url to export locally
        //获取服务发布的地址
        URL providerUrl = getProviderUrl(originInvoker);   //dubbo://127.0.0.1:20880

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            //注册到注册中心
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    
    //获取注册地址
    protected URL getRegistryUrl(URL url) {
        //传入url:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos
        //返回:nacos://127.0.0.1:8848?export=dubbo://127.0.0.1:20880
        //重新设置注册地址的协议
        return URLBuilder.from(url)
                .setProtocol(url.getParameter("registry", "dubbo"))
                .removeParameter("registry")
                .build();
    }
    
    //获取提供者的地址
    private URL getProviderUrl(final Invoker<?> originInvoker) {
         //传入url:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos
		//获取export的值,得到的url---dubbo://127.0.0.1:20880返回
        String export = originInvoker.getUrl().getParameterAndDecoded("export");
        if (export == null || export.length() == 0) {
            throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
        }
        return URL.valueOf(export);
    }
    
    //发布相应协议的服务
        private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }


}
  • getRegistryUrl: 获取注册中心的地址传入

    传入url:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos

    返回:nacos://127.0.0.1:8848?export=dubbo://127.0.0.1:20880

  • getProviderUrl:获取服务发布的地址

    传入url:registry://127.0.0.1:8848?export=dubbo://127.0.0.1:20880&registry=nacos

    返回:获取export的值,得到的url---dubbo://127.0.0.1:20880返回

  • doLocalExport(originInvoker, providerUrl): 发布服务

    • new InvokerDelegate<>(originInvoker, providerUrl): 通过原本的invoke和新的服务暴露地址,重构一个invoker
    • protocol.export(invokerDelegate): 实际上是Protocol$Adaptive.export, 又因为providerUrldubbo协议 。所以最后会调用DubboProtocol.export
  • register(registryUrl, registeredProviderUrl): 注册到注册中心上

DubboProtocol.export发布服务

public class DubboProtocol extends AbstractProtocol {
	   
    
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

	
	@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        //得到的key是: com.onion.service.IUserService:20880 ,接口:端口号
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        ...

        //打开dubbo服务监听
        openServer(url);
        //序列化的优化
        optimizeSerialization(url);

        return exporter;
    }
    
      private void openServer(URL url) {
        // find server. ip:端口号
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter("isserver", true);
          //读取缓存是否已经有dubbo服务了。有就重启,没有就创建createServer
        if (isServer) {
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
    
   private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent("heartbeat", String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter("codec", DubboCodec.NAME)
                .build();
       //Transporter的类型用什么,如果没有就用netty
        String str = url.getParameter("server", "netty");

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
		...

        return new DubboProtocolServer(server);
    }

}

Exchangers:

public class Exchangers {

     public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        url = url.addParameterIfAbsent("codec", "exchange");
          //默认使用HeaderExchanger
        return getExchanger(url).bind(url, handler);
    }
    
      public static Exchanger getExchanger(URL url) {
          //默认使用HeaderExchanger
        String type = url.getParameter("exchanger" "header");
        return getExchanger(type);
    }
}

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}



public class Transporters {

    static {
        // check duplicate jar package
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
       	//这里通过自适应扩展点的话肯定也是使用Transporter$Adapt,因为url之前createServer方法的时候那里取得是默认的netty
        return getTransporter().bind(url, handler);
    }
    
   public static Transporter getTransporter() {
     
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}
public class Transporter$Adaptive implements org.apache.dubbo.remoting.Transporter {
    public org.apache.dubbo.remoting.Client connect(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url (" + url.toString() + ") use keys([client, transporter])");
        org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }
    public org.apache.dubbo.remoting.RemotingServer bind(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg0;
        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url (" + url.toString() + ") use keys([server, transporter])");
        org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.bind(arg0, arg1);
    }
}

DubboProtocol.export执行过程的介绍:

  • openServer(url):打开服务的监听。
  • optimizeSerialization:序列化的优化

DubboProtocol.openServer(url):读取key为:ip:端口号的缓存,如果没有就调用createServer建立服务,有的话重启一下服务server.reset(url)

DubboProtocol.createServer: 调用Exchangers.bind(url, requestHandler);

Exchangers.bind: getExchanger(url)得到的是HeaderExchanger

HeaderExchanger.bind -> Transporters.bind: getTransporter()中因为是:自适应Transporter类的扩展点,最终会调用Transporter$Adaptive.bind,Transporter$Adaptive.bind默认使用的是nettyTransporter,所以最后调用:nettyTransporter.bind

NettyTransporter.bind监听服务

package org.apache.dubbo.remoting.transport.netty4;
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

public class NettyServer extends AbstractServer implements RemotingServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    /**
     * the cache for alive worker channel.
     * <ip:port, dubbo channel>
     */
    private Map<String, Channel> channels;
    /**
     * netty server bootstrap.
     */
    private ServerBootstrap bootstrap;
    /**
     * the boss channel that receive connections and dispatch these to worker channel.
     */
	private io.netty.channel.Channel channel;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
 
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }

    /**
     * Init and start netty server
     *
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        //netty一些操作,因为我没学习过netty就不讨论了
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
}
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    ExecutorService executor;
    private InetSocketAddress localAddress;
    private InetSocketAddress bindAddress;
    private int accepts;
    private int idleTimeout;

    private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
	
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            //调用子类的doOpen,就是NettyServer.doOpen
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        executor = executorRepository.createExecutorIfAbsent(url);
    }
 }

​ 可以看到NettyTransporter.bind其实就是new NettyServer(url, handler);新建一个NettyServer类,注意:AbstractServer是NettyServer类的父类

在AbstractServer构造方法里,可以看到看到代码中获取相应的ip,端口,然后调用子类的doOpen,就是NettyServer.doOpen

NettyServer.doOpen:其实就在netty一些操作,因为我没学习过netty就不讨论了

RegistryProtocol.register注册到注册中心里面

public class RegistryProtocol implements Protocol {
    
    private void register(URL registryUrl, URL registeredProviderUrl) {
        //获取相应的注册中心的注册类,registryFactory:会通过spring和dubbo的spi查找。之前在我的dubbo的spi文章有说到registryFactory这个点的
        Registry registry = registryFactory.getRegistry(registryUrl);
        
        registry.register(registeredProviderUrl);
    }
}
public class NacosRegistry extends FailbackRegistry {
    @Override
    public void doRegister(URL url) {
        final String serviceName = getServiceName(url);
        final Instance instance = createInstance(url);
        /**
         *  namingService.registerInstance with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
         *  default {@link DEFAULT_GROUP}
         *
         * in https://github.com/apache/dubbo/issues/5978
         */
        execute(namingService -> namingService.registerInstance(serviceName,
                getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
    }

}
public abstract class FailbackRegistry extends AbstractRegistry {
    public void register(URL url) {
        if (!this.acceptable(url)) {
            this.logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        } else {
            super.register(url);
            this.removeFailedRegistered(url);
            this.removeFailedUnregistered(url);

            try {
                this.doRegister(url);
            } catch (Exception var6) {
                Throwable t = var6;
                boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true) && !"consumer".equals(url.getProtocol());
                boolean skipFailback = var6 instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = var6.getCause();
                    }

                    throw new IllegalStateException("Failed to register " + url + " to registry " + this.getUrl().getAddress() + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t);
                }

                this.logger.error("Failed to register " + url + ", waiting for retry, cause: " + var6.getMessage(), var6);
                this.addFailedRegistered(url);
            }

        }
    }
}
  • registryFactory.getRegistry(registryUrl):获取相应的注册中心的注册类,registryFactory:会通过spring和dubbo的spi查找。之前在我的dubbo的spi文章有说到registryFactory这个点的
  • 这里我用的是nacos,所以使用NacosRegistry,又因为NacosRegistry继承了FailbackRegistry,所以调用FailbackRegistry.register ->NacosRegistry.doRegister

这篇文章就暂时的就此结束了。

  • this.exportMetadataService();
  • this.registerServiceInstance();
  • this.referServices();

还有这几个就暂时不分析了

附上一个高清的流程图:

https://gitee.com/gzgyc/blogimage/raw/master/dubbo提供者流程图.png

原文地址:https://www.cnblogs.com/dabenxiang/p/13941854.html