Eureka服务端启动源码分析

一、引言

  在我们使用Spring Cloud微服务开发的时候,一般采用Springboot框架,然后eureka服务端需要加一个注解@EnableEurekaServer,就从这个注解来解析eureka的源码。

二、@EnableEurekaServer

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

引入了一个配置类EurekaServerMarkerConfiguration

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }

    class Marker {

    }

}

注册了一个null的bean!!!

三、自动装配

  注解上没有看出什么,接着看一下Springboot的自动装配的配置文件spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

在springboot启动的时候会自动加载这个配置类

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
}

这里可以看到@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)只有Marker类存在的时候,才会自动装配。这里还引入了一个配置类EurekaServerInitializerConfiguration

@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {

    private static final Log log = LogFactory
            .getLog(EurekaServerInitializerConfiguration.class);

    @Autowired
    private EurekaServerConfig eurekaServerConfig;

    private ServletContext servletContext;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private EurekaServerBootstrap eurekaServerBootstrap;

    private boolean running;

    private int order = 1;

    @Override
    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    @Override
    public void start() {
        new Thread(() -> {
            try {
                //启动EurekaServer
                eurekaServerBootstrap.contextInitialized(
                        EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");
                //发布可注册事件
                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                //发布已启动事件
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
            }
        }).start();
    }

    private EurekaServerConfig getEurekaServerConfig() {
        return this.eurekaServerConfig;
    }

    private void publish(ApplicationEvent event) {
        this.applicationContext.publishEvent(event);
    }

    @Override
    public void stop() {
        this.running = false;
        eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }
    
    //调用顺序,越小越靠前
    @Override
    public int getPhase() {
        return 0;
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        callback.run();
    }

    @Override
    public int getOrder() {
        return this.order;
    }

}

实现了SmartLifecycle类,那么在spring启动的时候,会调用start方法,关闭时会调用stop方法

1、启动eureka

    public void contextInitialized(ServletContext context) {
        try {
            //初始化执行环境
            initEurekaEnvironment();
            //初始化上下文
            initEurekaServerContext();
            
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            ···
        }
    }

看一下核心方法初始化上下文

    protected void initEurekaServerContext() throws Exception {
        // 设置json与xml序列化工具
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);

        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                    this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }
        //初始化eureka上下文包装类
        EurekaServerContextHolder.initialize(this.serverContext);

        log.info("Initialized server context");

        // 同步Eureka集群数据,每一个eureka服务端对于其他服务端来说,都是客户端
        int registryCount = this.registry.syncUp();
     // 更改实例状态,对外提供服务,开启一个定时器,每60秒进行一次服务检测(判断是否剔除)
this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 注册监控统计信息 EurekaMonitors.registerAllStats(); }

同步集群注册信息

 public int syncUp() {

        int count = 0;
        //serverConfig.getRegistrySyncRetries()获取重试次数
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //获取其他server的注册表信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            //把从远程获取到的注册信息,注册到自己的注册表中(map)
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

2、发布EurekaRegistryAvailableEvent事件

  我们可以自己实现一个监听器,监听eureka的可注册事件,并且可以获得EurekaServerConfig对象,然后写自己的逻辑

3、发布EurekaServerStartedEvent事件

  我们可以自己实现一个监听器,监听eureka的已启动事件,并且可以获得EurekaServerConfig对象,然后写自己的逻辑

四、导入的配置

1、@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})

EurekaDashboardProperties这个类比较简单,主要是Eureka的控制台的相关配置

//控制台默认路径
private String path = "/";
//是否开启控制台
private boolean enabled = true;

InstanceRegistryProperties,这个类是控制Eureka的注册时的配置信息

   //每分钟续约次数
    @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") 
    private int expectedNumberOfRenewsPerMin = 1;
    //默认打开的通信数量
    @Value("${eureka.server.defaultOpenForTrafficCount:1}")
    private int defaultOpenForTrafficCount = 1;

@PropertySource("classpath:/eureka/server.properties")

加载配置文件

spring.http.encoding.force=false

五、配置类EurekaServerAutoConfiguration的@Bean

eureka监控面板的controller,可以添加配置eureka.dashboard.enabled = false,去除管理界面

    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
            matchIfMissing = true)
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }

设置eureka的序列化工具

    @Bean
    public ServerCodecs serverCodecs() {
        return new CloudServerCodecs(this.eurekaServerConfig);
    }

实例化服务注册核心类

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

实例化PeerEurekaNodes节点的子类RefreshablePeerEurekaNodes,封装对等节点的相关信息和操作,实现了ApplicationListener<EnvironmentChangeEvent> 监听器,监听EnvironmentChangeEvent事件,比如:更新集群中的节点

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs,
            ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
                replicationClientAdditionalFilters);
    }

注入EurekaServer上下文DefaultEurekaServerContext

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }
public class DefaultEurekaServerContext implements EurekaServerContext {
    @PostConstruct
    @Override
    public void initialize() {
        //启动一个定时线程
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }
}

//PeerEurekaNodes
public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            //更新集群节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

注入EurekaServerBootstrap,后续启动需要使用该对象

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }

注册了jerseyFilter,拦截客户端的注册请求,Jersey是一个rest框架帮助我们发布restful服务接口(类似于SpringMVC)

    @Bean
    public FilterRegistrationBean<?> jerseyFilterRegistration(
            javax.ws.rs.core.Application eurekaJerseyApp) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(Ordered.LOWEST_PRECEDENCE);
        bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

        return bean;
    }
原文地址:https://www.cnblogs.com/sglx/p/15702318.html