Eureka源码解析(二)

服务注册触发路径

这个动作在spring boot的启动方法中的refreshContext中完成

SpringApplication.run() -> this.refreshContext(context); -> this.refresh(context); -> ServletWebServerApplicationContext.refresh() -> this.finishRefresh();
-> AbstractApplicationContext.finishRefresh->DefaultLifecycleProcessor.onRefresh()-> this.startBeans->this.start()->this.doStart()

finishRefresh

这个方法,从名字上可以看到它是用来体现完成刷新的操作,也就是刷新完成之后要做的后置的操作。它主要做几个事情

  • 清空缓存
  • 初始化一个LifecycleProcessor,在Spring启动的时候启动bean,在spring结束的时候销毁bean
  • 调用LifecycleProcessor的onRefresh方法,启动实现了Lifecycle接口的bean
  • 发布ContextRefreshedEvent
  • 注册MBean,通过JMX进行监控和管理
protected void finishRefresh() {
      // Clear context-level resource caches (such as ASM metadata from scanning).
      clearResourceCaches();

      // Initialize lifecycle processor for this context.
      initLifecycleProcessor();

      // Propagate refresh to lifecycle processor first.
      getLifecycleProcessor().onRefresh();

      // Publish the final event.
      publishEvent(new ContextRefreshedEvent(this));

      // Participate in LiveBeansView MBean, if active.
      LiveBeansView.registerApplicationContext(this);
}		

在这个方法中,重点关注 getLifecycleProcessor().onRefresh() ,它是调用生命周期处理器的onrefresh方法,找到SmartLifecycle接口的所有实现类并调用start方法。

后续的调用链路为:DefaultLifecycleProcessor.startBean -> start() -> doStart() -> bean.start()

SmartLifeCycle

SmartLifeCycle是一个接口,当Spring容器加载完所有的Bean并且初始化之后,会继续回调实现了SmartLifeCycle接口的类中对应的方法,比如(start)

SmartLifeCycle的理解:

@Component
public class MyLifecycleProcessor implements SmartLifecycle {


    @Override
    public void start() {
        System.out.println("start");
    }

    @Override
    public void stop() {
        System.out.println("stop");
    }

    @Override
    public boolean isRunning() {
        return false;
    }
}

启动spring boot应用后,可以看到控制台输出了 start 字符串。在DefaultLifecycleProcessor.startBeans方法上加一个debug,可以很明显的看到我们自己定义的MyLifecycleProcessor被扫描到了,并且最后会调用该bean的start方法。

在startBeans方法中,可以看到它首先会获得所有实现了SmartLifeCycle的Bean,然后会循环调用实现了SmartLifeCycle的bean的start方法,代码如下。

private void startBeans(boolean autoStartupOnly) {
      Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
      Map<Integer, LifecycleGroup> phases = new HashMap<>();
      lifecycleBeans.forEach((beanName, bean) -> {
            if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
                  int phase = getPhase(bean);
                  LifecycleGroup group = phases.get(phase);
                  if (group == null) {
                        group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
		        phases.put(phase, group);
            	  }
	          group.add(beanName, bean);
            }
      });
      if (!phases.isEmpty()) {
            List<Integer> keys = new ArrayList<>(phases.keySet());
            Collections.sort(keys);
            for (Integer key : keys) {
                  phases.get(key).start();
            }
      }
}

doStart()

private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
    Lifecycle bean = lifecycleBeans.remove(beanName);
    if (bean != null && bean != this) {
        String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
        for (String dependency : dependenciesForBean) {
            doStart(lifecycleBeans, dependency, autoStartupOnly);
        }
        if (!bean.isRunning() &&
            (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
            if (logger.isTraceEnabled()) {
                logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
            }
            try {
                //此时 Bean的实例是EurekaAutoServiceRegistration
                bean.start();
            }
            catch (Throwable ex) {
                throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Successfully started bean '" + beanName + "'");
            }
        }
    }
}

此时,bean.start(),调用的是EurekaAutoServiceRegistration中的start方法,因为很显然,它实现了SmartLifeCycle接口。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {

    @Override
    public void start() {
        //... 
    }
}

在start方法中,我们可以看到 this.serviceRegistry.register 这个方法,它实际上就是发起服务注册的机制。
此时this.serviceRegistry的实例,应该是 EurekaServiceRegistry , 原因是EurekaAutoServiceRegistration的构造方法中,会有一个赋值操作,而这个构造方法是在EurekaClientAutoConfiguration 这个自动装配类中被装配和初始化的,代码如下。

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(
    value = "spring.cloud.service-registry.auto-registration.enabled",
    matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
    ApplicationContext context, EurekaServiceRegistry registry,
    EurekaRegistration registration) {
    return new EurekaAutoServiceRegistration(context, registry, registration);
}

服务的注册

EurekaAutoServiceRegistration.start

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {

    @Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {
            if (this.registration.getNonSecurePort() == 0) {
                this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                this.registration.setSecurePort(this.port.get());
            }
        }

        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }
}

this.serviceRegistry.register(this.registration); 方法最终会调用 EurekaServiceRegistry 类中的 register 方法来实现服务注册

EurekaServiceRegistry.register

@Override
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {
        log.info("Registering application "
                 + reg.getApplicationInfoManager().getInfo().getAppName()
                 + " with eureka with status "
                 + reg.getInstanceConfig().getInitialStatus());
    }
    //设置当前实例的状态,一旦这个实例的状态发生变化,只要状态不是DOWN,那么就会被监听器监听并且执行服务注册。
    reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    //设置健康检查的处理
    reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}

从上述代码来看,注册方法中并没有真正调用Eureka的方法去执行注册,而是仅仅设置了一个状态以及设置健康检查处理器。继续看reg.getApplicationInfoManager().setInstanceStatus方法

public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {
        return;
    }

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
        for (StatusChangeListener listener : listeners.values()) {
            try {
                listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

在这个方法中,它会通过监听器来发布一个状态变更事件。那么此时listener的实例是StatusChangeListener ,也就是调用 StatusChangeListener 的notify方法。这个事件是触发一个服务状态变更,应该是有地方会监听这个事件,然后基于这个事件。
这个时候以为找到了方向,然后进去看,发现它是一个接口。而且我们发现它是静态的内部接口,还无法直接看到它的实现类。
猜测一定是在某个地方做了初始化的工作,于是,找到EurekaServiceRegistry.register方法中的 reg.getApplicationInfoManager 这个实例是什么,而且我们发现ApplicationInfoManager是来自于EurekaRegistration这个类中的属性。而
EurekaRegistration又是在EurekaAutoServiceRegistration这个类中实例化的。那么,是不是在自动装配中做了什么东西。于是找到EurekaClientAutoConfiguration这个类,果然看到了Bean的一些自动装配,其中包含 EurekaClient 、ApplicationInfoMangager 、 EurekaRegistration 等。

EurekaClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {

    @Autowired
    private ApplicationContext context;

    @Autowired
    private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
                              search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
                                     EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,
                                     this.context);
    }

    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
                              search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(
        EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
        value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
                                                 CloudEurekaInstanceConfig instanceConfig,
                                                 ApplicationInfoManager applicationInfoManager, @Autowired(
                                                     required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
            .with(eurekaClient).with(healthCheckHandler).build();
    }
}

不难发现,在这里看到了一个很重要的Bean在启动的时候做了自动装配,也就是CloudEurekaClient 。从名字上来看,可以很容易的识别并猜测出它是Eureka客户端的一个工具类,用来实现和服务端的通信以及处理。这个很多源码一贯的套路,要么在构造方法里面去做很多的初始化和一些后台执行的程序操作,要么就是通过异步事件的方式来处理。接着,我们看一下CloudEurekaClient的初始化过程,它的构造方法中会通过 super 调用父类的构造方法。也就是DiscoveryClient的构造。

CloudEurekaClient

super(applicationInfoManager, config, args);调用父类的构造方法,而CloudEurekaClient的父类是DiscoveryClient.

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
                         ApplicationEventPublisher publisher) {
    super(applicationInfoManager, config, args);
    this.applicationInfoManager = applicationInfoManager;
    this.publisher = publisher;
    this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
    ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

DiscoveryClient

我们可以看到在最终的DiscoveryClient改造方法中,有非常长的代码。其实很多代码可以不需要关心,大部分都是一些初始化工作,比如初始化了几个定时任务

  • scheduler
  • heartbeatExecutor 心跳定时任务
  • cacheRefreshExecutor 定时去同步服务端的实例列表
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    //...
    //是否要从eureka server上获取服务地址信息
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    //是否要注册到eureka server上
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    //如果不需要注册并且不需要更新服务地址
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        //...
        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

    //...
    //如果需要注册到Eureka server并且是开启了初始化的时候强制注册,则调用register()发起服务注册
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}

DiscoveryClient.initScheduledTasks

initScheduledTasks 去启动一个定时任务。如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务如果开启了服务注册到Eureka,则通过需要做几个事情.

  • 建立心跳检测机制
  • 通过内部类来实例化StatusChangeListener 实例状态监控接口,这个就是前面在分析启动过程中所看到的,调用notify的方法,实际上会在这里体现.
private void initScheduledTasks() {
    //如果配置了开启从注册中心刷新服务列表,则会开启cacheRefreshExecutor这个定时任务
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
            "cacheRefresh",
            scheduler,
            cacheRefreshExecutor,
            registryFetchIntervalSeconds,
            TimeUnit.SECONDS,
            expBackOffBound,
            new CacheRefreshThread()
        );
        scheduler.schedule(
            cacheRefreshTask,
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
	//如果开启了服务注册到Eureka,则通过需要做几个事情
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
            "heartbeat",
            scheduler,
            heartbeatExecutor,
            renewalIntervalInSecs,
            TimeUnit.SECONDS,
            expBackOffBound,
            new HeartbeatThread()
        );
        scheduler.schedule(
            heartbeatTask,
            renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator 初始化一个:instanceInfoReplicator
        instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };
		//注册实例状态变化的监听
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

    //启动一个实例信息复制器,主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册        
    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

instanceInfoReplicator.onDemandUpdate()

这个方法的主要作用是根据实例数据是否发生变化,来触发服务注册中心的数据。

public boolean onDemandUpdate() {
    //限流判断(令牌桶算法)
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            //提交一个任务
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
					//取出之前已经提交的任务,也就是在start方法中提交的更新任务,如果任务还没有执行完成,则取消之前的任务。
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        //取出之前已经提交的任务,也就是在start方法中提交的更新任务,如果任务还没有执行完成,则取消之前的任务。
                        latestPeriodic.cancel(false);
                    }
					//通过调用run方法,令任务在延时后执行,相当于周期性任务中的一次
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

InstanceInfoReplicator.this.run();

run方法实际上和前面自动装配所执行的服务注册方法是一样的,也就是调用 register 方法进行服务注册,并且在finally中,每30s会定时执行一下当前的run 方法进行检查

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

DiscoveryClient.register

终于找到服务注册的入口了, eurekaTransport.registrationClient.register 最终调用的是 AbstractJerseyEurekaHttpClient#register(...)`, 这里使用了很多的设计模式,比如工厂模式、装饰器模式等。

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

AbstractJerseyEurekaHttpClient#register

很显然,这里是发起了一次http请求,访问Eureka-Server的apps/${APP_NAME}接口,将当前服务实例的信息发送到Eureka Server进行保存。
至此,我们基本上已经知道Spring Cloud Eureka 是如何在启动的时候把服务信息注册到Eureka Server上的了

@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
            .header("Accept-Encoding", "gzip")
            .type(MediaType.APPLICATION_JSON_TYPE)
            .accept(MediaType.APPLICATION_JSON)
            .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                         response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

但是,似乎最开始的问题还没有解决,也就是Spring Boot应用在启动时,会调用start方法,最终调用StatusChangeListener.notify 去更新服务的一个状态,并没有直接调用register方法注册。所以继续去看 statusChangeListener.notify 方法。

服务总结

至此,我们知道Eureka Client发起服务注册时,有两个地方会执行服务注册的任务
1. 在Spring Boot启动时,由于自动装配机制将CloudEurekaClient注入到了容器,并且执行了构造方法,而在构造方法中有一个定时任务每40s会执行一次判断,判断实例信息是否发生了变化,如果是则会发起服务注册的流程
2. 在Spring Boot启动时,通过refresh方法,最终调用StatusChangeListener.notify进行服务状态变更的监听,而这个监听的方法受到事件之后会去执行服务注册。
原文地址:https://www.cnblogs.com/snail-gao/p/14131314.html