EurekaServer源码分析

Eureka Server功能

  • 接受服务注册
  • 接受服务心跳
  • 服务剔除
  • 服务下线
  • 集群同步
  • 获取注册表中服务实例信息

需要注意的是,Eureka Server同时也是一个Eureka Client,在不禁止Eureka Server的客户端行为时,它会向它配置文件中的其他Eureka Server进行拉取注册表、服务注册和发送心跳等操作

启动server注册相关bean

注册外部的配置类 spring-cloud-netflix-eureka-server-2.1.2.REALEASE.jar 中 META-INF/spring.factories中

  • org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  • org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

启动时会自动加载:EurekaServerAutoConfiguration

功能:向spring的bean工厂添加eureka-server相关功能的bean

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.converters.EurekaJacksonCodec;
import com.netflix.discovery.converters.wrappers.CodecWrapper;
import com.netflix.discovery.converters.wrappers.CodecWrappers;
import com.netflix.discovery.converters.wrappers.CodecWrappers.JacksonJsonMini;
import com.netflix.discovery.converters.wrappers.CodecWrappers.JacksonXmlMini;
import com.netflix.discovery.converters.wrappers.CodecWrappers.XStreamXml;
import com.netflix.eureka.DefaultEurekaServerContext;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.EurekaServerContext;
import com.netflix.eureka.cluster.PeerEurekaNode;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.resources.DefaultServerCodecs;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.transport.JerseyReplicationClient;
import com.sun.jersey.api.core.DefaultResourceConfig;
import com.sun.jersey.spi.container.servlet.ServletContainer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.servlet.Filter;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
import javax.ws.rs.ext.Provider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.cloud.netflix.eureka.server.EurekaServerMarkerConfiguration.Marker;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.ClassUtils;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration(
    proxyBeanMethods = false
)
@Import({EurekaServerInitializerConfiguration.class})
@ConditionalOnBean({Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    private static final String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};
    @Autowired
    private ApplicationInfoManager applicationInfoManager;
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    @Autowired
    private EurekaClientConfig eurekaClientConfig;
    @Autowired
    private EurekaClient eurekaClient;
    @Autowired
    private InstanceRegistryProperties instanceRegistryProperties;
    public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

    public EurekaServerAutoConfiguration() {
    }

    @Bean
    public HasFeatures eurekaServerFeature() {
        return HasFeatures.namedFeature("Eureka Server", EurekaServerAutoConfiguration.class);
    }

    @Bean
    @ConditionalOnProperty(
        prefix = "eureka.dashboard",
        name = {"enabled"},
        matchIfMissing = true
    )
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }

    @Bean
    public ServerCodecs serverCodecs() {
        return new EurekaServerAutoConfiguration.CloudServerCodecs(this.eurekaServerConfig);
    }

    private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
        CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
        return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
    }

    private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
        CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
        return codec == null ? CodecWrappers.getCodec(XStreamXml.class) : codec;
    }

    @Bean
    @ConditionalOnMissingBean
    public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() {
        return new ReplicationClientAdditionalFilters(Collections.emptySet());
    }

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications();
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
        return new EurekaServerAutoConfiguration.RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters);
    }

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
    }

    @Bean
    public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(2147483647);
        bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
        return bean;
    }

    @Bean
    public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
        provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
        provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
        Set<Class<?>> classes = new HashSet();
        String[] var5 = EUREKA_PACKAGES;
        int var6 = var5.length;

        for(int var7 = 0; var7 < var6; ++var7) {
            String basePackage = var5[var7];
            Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
            Iterator var10 = beans.iterator();

            while(var10.hasNext()) {
                BeanDefinition bd = (BeanDefinition)var10.next();
                Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
                classes.add(cls);
            }
        }

        Map<String, Object> propsAndFeatures = new HashMap();
        propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
        DefaultResourceConfig rc = new DefaultResourceConfig(classes);
        rc.setPropertiesAndFeatures(propsAndFeatures);
        return rc;
    }

    @Bean
    @ConditionalOnBean(
        name = {"httpTraceFilter"}
    )
    public FilterRegistrationBean<?> traceFilterRegistration(@Qualifier("httpTraceFilter") Filter filter) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
        bean.setFilter(filter);
        bean.setOrder(2147483637);
        return bean;
    }

    static {
        CodecWrappers.registerWrapper(JACKSON_JSON);
        EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
    }

    class CloudServerCodecs extends DefaultServerCodecs {
        CloudServerCodecs(EurekaServerConfig serverConfig) {
            super(EurekaServerAutoConfiguration.getFullJson(serverConfig), CodecWrappers.getCodec(JacksonJsonMini.class), EurekaServerAutoConfiguration.getFullXml(serverConfig), CodecWrappers.getCodec(JacksonXmlMini.class));
        }
    }

    static class RefreshablePeerEurekaNodes extends PeerEurekaNodes implements ApplicationListener<EnvironmentChangeEvent> {
        private ReplicationClientAdditionalFilters replicationClientAdditionalFilters;

        RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig, final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs, final ApplicationInfoManager applicationInfoManager, final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
            super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
            this.replicationClientAdditionalFilters = replicationClientAdditionalFilters;
        }

        protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
            JerseyReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(this.serverConfig, this.serverCodecs, peerEurekaNodeUrl);
            this.replicationClientAdditionalFilters.getFilters().forEach(replicationClient::addReplicationClientFilter);
            String targetHost = hostFromUrl(peerEurekaNodeUrl);
            if (targetHost == null) {
                targetHost = "host";
            }

            return new PeerEurekaNode(this.registry, targetHost, peerEurekaNodeUrl, replicationClient, this.serverConfig);
        }

        public void onApplicationEvent(final EnvironmentChangeEvent event) {
            if (this.shouldUpdate(event.getKeys())) {
                this.updatePeerEurekaNodes(this.resolvePeerUrls());
            }

        }

        protected boolean shouldUpdate(final Set<String> changedKeys) {
            assert changedKeys != null;

            if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                return false;
            } else if (changedKeys.contains("eureka.client.region")) {
                return true;
            } else {
                Iterator var2 = changedKeys.iterator();

                String key;
                do {
                    if (!var2.hasNext()) {
                        return false;
                    }

                    key = (String)var2.next();
                } while(!key.startsWith("eureka.client.service-url.") && !key.startsWith("eureka.client.availability-zones."));

                return true;
            }
        }
    }

    @Configuration(
        proxyBeanMethods = false
    )
    protected static class EurekaServerConfigBeanConfiguration {
        protected EurekaServerConfigBeanConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) {
                server.setRegistrySyncRetries(5);
            }

            return server;
        }
    }
}

但是EurekaServerAutoConfiguration的生效时有条件的

  • EurekaServerAutoConfiguration上有一个注解:@ConditionalOnBean(Marker)
  • 这个Marker是org.springframework.cloud.netflix.eureka.server.EurekaServerMarkerConfiguration.Marker

只有在Spring容器里有Marker这个类的实例时,才会加载EurekaServerAutoConfiguration,这个就是控制是否开启Eureka Server的关键

开启eureka server

开关

  • @EnableEurekaServer中,@Import(EurekaServerMarkerConfiguration.class)
  • 动态注入此bean到spring 容器,引入了EurekaServerMarkerConfiguration.class
  • 所以开启了Server服务。所以注册了前面说的:EurekaServerAutoConfiguration

查看@EnableEurekaServer注解

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}

开启注册

  • 在EurekaServerMarkerConfiguration上有@Import(EurekaServerInitializerConfiguration.class),导入了EurekaServerInitializerConfiguration,
  • EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle
  • SmartLifecycle的作用是:初始化完之后,执行public void start()方法

查看EurekaServerInitializerConfiguration.class

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.eureka.EurekaServerConfig;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.netflix.eureka.server.event.EurekaRegistryAvailableEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaServerStartedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.web.context.ServletContextAware;

@Configuration(
    proxyBeanMethods = false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
    private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    private ServletContext servletContext;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private EurekaServerBootstrap eurekaServerBootstrap;
    private boolean running;
    private int order = 1;

    public EurekaServerInitializerConfiguration() {
    }

    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    public void start() {
        (new Thread(() -> {
            try {
                this.eurekaServerBootstrap.contextInitialized(this.servletContext);
                log.info("Started Eureka Server");
                this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));
                this.running = true;
                this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));
            } catch (Exception var2) {
                log.error("Could not initialize Eureka servlet context", var2);
            }

        })).start();
    }

    private EurekaServerConfig getEurekaServerConfig() {
        return this.eurekaServerConfig;
    }

    private void publish(ApplicationEvent event) {
        this.applicationContext.publishEvent(event);
    }

    public void stop() {
        this.running = false;
        this.eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        callback.run();
    }

    public int getOrder() {
        return this.order;
    }
}
  • 在public void start()中,启动一个线程,看注释:log.info("Started Eureka Server");
  • 发布事件:publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()))
  • 告诉client,可以来注册了

上面提到的 log.info("Started Eureka Server") 的上面一行

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);

点contextInitialized进去,查看EurekaServerBootstrap

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.converters.JsonXStream;
import com.netflix.discovery.converters.XmlXStream;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.EurekaServerContext;
import com.netflix.eureka.EurekaServerContextHolder;
import com.netflix.eureka.V1AwareInstanceInfoConverter;
import com.netflix.eureka.aws.AwsBinder;
import com.netflix.eureka.aws.AwsBinderDelegate;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.util.EurekaMonitors;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class EurekaServerBootstrap {
    private static final Log log = LogFactory.getLog(EurekaServerBootstrap.class);
    private static final String TEST = "test";
    private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";
    private static final String EUREKA_ENVIRONMENT = "eureka.environment";
    private static final String DEFAULT = "default";
    private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";
    private static final String EUREKA_DATACENTER = "eureka.datacenter";
    protected EurekaServerConfig eurekaServerConfig;
    protected ApplicationInfoManager applicationInfoManager;
    protected EurekaClientConfig eurekaClientConfig;
    protected PeerAwareInstanceRegistry registry;
    protected volatile EurekaServerContext serverContext;
    protected volatile AwsBinder awsBinder;

    public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager, EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig, PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        this.applicationInfoManager = applicationInfoManager;
        this.eurekaClientConfig = eurekaClientConfig;
        this.eurekaServerConfig = eurekaServerConfig;
        this.registry = registry;
        this.serverContext = serverContext;
    }

    public void contextInitialized(ServletContext context) {
        try {
            this.initEurekaEnvironment();
            this.initEurekaServerContext();
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        } catch (Throwable var3) {
            log.error("Cannot bootstrap eureka server :", var3);
            throw new RuntimeException("Cannot bootstrap eureka server :", var3);
        }
    }

    public void contextDestroyed(ServletContext context) {
        try {
            log.info("Shutting down Eureka Server..");
            context.removeAttribute(EurekaServerContext.class.getName());
            this.destroyEurekaServerContext();
            this.destroyEurekaEnvironment();
        } catch (Throwable var3) {
            log.error("Error shutting down eureka", var3);
        }

        log.info("Eureka Service is now shutdown...");
    }

    protected void initEurekaEnvironment() throws Exception {
        log.info("Setting the eureka configuration..");
        String dataCenter = ConfigurationManager.getConfigInstance().getString("eureka.datacenter");
        if (dataCenter == null) {
            log.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", "default");
        } else {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", dataCenter);
        }

        String environment = ConfigurationManager.getConfigInstance().getString("eureka.environment");
        if (environment == null) {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", "test");
            log.info("Eureka environment value eureka.environment is not set, defaulting to test");
        } else {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", environment);
        }

    }

    protected void initEurekaServerContext() throws Exception {
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        if (this.isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);
        log.info("Initialized server context");
        int registryCount = this.registry.syncUp();
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);
        EurekaMonitors.registerAllStats();
    }

    protected void destroyEurekaServerContext() throws Exception {
        EurekaMonitors.shutdown();
        if (this.awsBinder != null) {
            this.awsBinder.shutdown();
        }

        if (this.serverContext != null) {
            this.serverContext.shutdown();
        }

    }

    protected void destroyEurekaEnvironment() throws Exception {
    }

    protected boolean isAws(InstanceInfo selfInstanceInfo) {
        boolean result = Name.Amazon == selfInstanceInfo.getDataCenterInfo().getName();
        log.info("isAws returned " + result);
        return result;
    }
}

看到initEurekaServerContext方法,初始化eureka 上下文

点initEurekaServerContext查看该方法

protected void initEurekaServerContext() throws Exception {
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    if (this.isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    EurekaServerContextHolder.initialize(this.serverContext);
    log.info("Initialized server context");
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}

看到 int registryCount = this.registry.syncUp(); // 从相邻的eureka 节点复制注册表
 下一行,查看 openForTraffic(主要是和client 交换信息,traffic)的实现类

查看 PeerAwareInstanceRegistryImpl

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka.registry;

import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.LeaseInfo;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.Version;
import com.netflix.eureka.cluster.PeerEurekaNode;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.rule.DownOrStartingRule;
import com.netflix.eureka.registry.rule.FirstMatchWinsCompositeRule;
import com.netflix.eureka.registry.rule.InstanceStatusOverrideRule;
import com.netflix.eureka.registry.rule.LeaseExistsRule;
import com.netflix.eureka.registry.rule.OverrideExistsRule;
import com.netflix.eureka.resources.CurrentRequestVersion;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.netflix.eureka.util.MeasuredRate;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);
    private static final String US_EAST_1 = "us-east-1";
    private static final int PRIME_PEER_NODES_RETRY_MS = 30000;
    private long startupTime = 0L;
    private boolean peerInstancesTransferEmptyOnStartup = true;
    private static final Comparator<Application> APP_COMPARATOR = new Comparator<Application>() {
        public int compare(Application l, Application r) {
            return l.getName().compareTo(r.getName());
        }
    };
    private final MeasuredRate numberOfReplicationsLastMin;
    protected final EurekaClient eurekaClient;
    protected volatile PeerEurekaNodes peerEurekaNodes;
    private final InstanceStatusOverrideRule instanceStatusOverrideRule;
    private Timer timer = new Timer("ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);

    @Inject
    public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
        super(serverConfig, clientConfig, serverCodecs);
        this.eurekaClient = eurekaClient;
        this.numberOfReplicationsLastMin = new MeasuredRate(60000L);
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new InstanceStatusOverrideRule[]{new DownOrStartingRule(), new OverrideExistsRule(this.overriddenInstanceStatusMap), new LeaseExistsRule()});
    }

    protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
        return this.instanceStatusOverrideRule;
    }

    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        this.initializedResponseCache();
        this.scheduleRenewalThresholdUpdateTask();
        this.initRemoteRegionRegistry();

        try {
            Monitors.registerObject(this);
        } catch (Throwable var3) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);
        }

    }

    public void shutdown() {
        try {
            DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this));
        } catch (Throwable var3) {
            logger.error("Cannot shutdown monitor registry", var3);
        }

        try {
            this.peerEurekaNodes.shutdown();
        } catch (Throwable var2) {
            logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", var2);
        }

        this.numberOfReplicationsLastMin.stop();
        this.timer.cancel();
        super.shutdown();
    }

    private void scheduleRenewalThresholdUpdateTask() {
        this.timer.schedule(new TimerTask() {
            public void run() {
                PeerAwareInstanceRegistryImpl.this.updateRenewalThreshold();
            }
        }, (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs(), (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

    public int syncUp() {
        int count = 0;

        for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
            if (i > 0) {
                try {
                    Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException var10) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }

            Applications apps = this.eurekaClient.getApplications();
            Iterator var4 = apps.getRegisteredApplications().iterator();

            while(var4.hasNext()) {
                Application app = (Application)var4.next();
                Iterator var6 = app.getInstances().iterator();

                while(var6.hasNext()) {
                    InstanceInfo instance = (InstanceInfo)var6.next();

                    try {
                        if (this.isRegisterable(instance)) {
                            this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            ++count;
                        }
                    } catch (Throwable var9) {
                        logger.error("During DS init copy", var9);
                    }
                }
            }
        }

        return count;
    }

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        this.expectedNumberOfClientsSendingRenews = count;
        this.updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }

        Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            this.primeAwsReplicas(applicationInfoManager);
        }

        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }

    private void primeAwsReplicas(ApplicationInfoManager applicationInfoManager) {
        boolean areAllPeerNodesPrimed = false;

        while(!areAllPeerNodesPrimed) {
            String peerHostName = null;

            try {
                Application eurekaApps = this.getApplication(applicationInfoManager.getInfo().getAppName(), false);
                if (eurekaApps == null) {
                    areAllPeerNodesPrimed = true;
                    logger.info("No peers needed to prime.");
                    return;
                }

                Iterator var5 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

                while(var5.hasNext()) {
                    PeerEurekaNode node = (PeerEurekaNode)var5.next();
                    Iterator var7 = eurekaApps.getInstances().iterator();

                    while(var7.hasNext()) {
                        InstanceInfo peerInstanceInfo = (InstanceInfo)var7.next();
                        LeaseInfo leaseInfo = peerInstanceInfo.getLeaseInfo();
                        if (System.currentTimeMillis() <= leaseInfo.getRenewalTimestamp() + (long)(leaseInfo.getDurationInSecs() * 1000) + 120000L) {
                            peerHostName = peerInstanceInfo.getHostName();
                            logger.info("Trying to send heartbeat for the eureka server at {} to make sure the network channels are open", peerHostName);
                            if (peerHostName.equalsIgnoreCase((new URI(node.getServiceUrl())).getHost())) {
                                node.heartbeat(peerInstanceInfo.getAppName(), peerInstanceInfo.getId(), peerInstanceInfo, (InstanceStatus)null, true);
                            }
                        }
                    }
                }

                areAllPeerNodesPrimed = true;
            } catch (Throwable var11) {
                logger.error("Could not contact {}", peerHostName, var11);

                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException var10) {
                    logger.warn("Interrupted while priming : ", var10);
                    areAllPeerNodesPrimed = true;
                }
            }
        }

    }

    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
        if (this.peerInstancesTransferEmptyOnStartup && System.currentTimeMillis() <= this.startupTime + (long)this.serverConfig.getWaitTimeInMsWhenSyncEmpty()) {
            return false;
        } else {
            if (remoteRegionRequired) {
                Iterator var2 = this.regionNameVSRemoteRegistry.values().iterator();

                while(var2.hasNext()) {
                    RemoteRegionRegistry remoteRegionRegistry = (RemoteRegionRegistry)var2.next();
                    if (!remoteRegionRegistry.isReadyForServingData()) {
                        return false;
                    }
                }
            }

            return true;
        }
    }

    public boolean shouldAllowAccess() {
        return this.shouldAllowAccess(true);
    }

    /** @deprecated */
    @Deprecated
    public List<PeerEurekaNode> getReplicaNodes() {
        return Collections.unmodifiableList(this.peerEurekaNodes.getPeerEurekaNodes());
    }

    public boolean cancel(String appName, String id, boolean isReplication) {
        if (super.cancel(appName, id, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public void register(InstanceInfo info, boolean isReplication) {
        int leaseDuration = 90;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }

        super.register(info, leaseDuration, isReplication);
        this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
    }

    public boolean renew(String appName, String id, boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Heartbeat, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
        if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.StatusUpdate, appName, id, (InstanceInfo)null, newStatus, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
        if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.DeleteStatusOverride, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public void statusUpdate(String asgName, ASGStatus newStatus, boolean isReplication) {
        if (!isReplication) {
            Iterator var4 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

            while(var4.hasNext()) {
                PeerEurekaNode node = (PeerEurekaNode)var4.next();
                this.replicateASGInfoToReplicaNodes(asgName, newStatus, node);
            }

        }
    }

    public boolean isLeaseExpirationEnabled() {
        if (!this.isSelfPreservationModeEnabled()) {
            return true;
        } else {
            return this.numberOfRenewsPerMinThreshold > 0 && this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
        }
    }

    public boolean isSelfPreservationModeEnabled() {
        return this.serverConfig.shouldEnableSelfPreservation();
    }

    public InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure) {
        return null;
    }

    private void updateRenewalThreshold() {
        try {
            Applications apps = this.eurekaClient.getApplications();
            int count = 0;
            Iterator var3 = apps.getRegisteredApplications().iterator();

            while(var3.hasNext()) {
                Application app = (Application)var3.next();
                Iterator var5 = app.getInstances().iterator();

                while(var5.hasNext()) {
                    InstanceInfo instance = (InstanceInfo)var5.next();
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }

            synchronized(this.lock) {
                if ((double)count > this.serverConfig.getRenewalPercentThreshold() * (double)this.expectedNumberOfClientsSendingRenews || !this.isSelfPreservationModeEnabled()) {
                    this.expectedNumberOfClientsSendingRenews = count;
                    this.updateRenewsPerMinThreshold();
                }
            }

            logger.info("Current renewal threshold is : {}", this.numberOfRenewsPerMinThreshold);
        } catch (Throwable var9) {
            logger.error("Cannot update renewal threshold", var9);
        }

    }

    public List<Application> getSortedApplications() {
        List<Application> apps = new ArrayList(this.getApplications().getRegisteredApplications());
        Collections.sort(apps, APP_COMPARATOR);
        return apps;
    }

    @Monitor(
        name = "numOfReplicationsInLastMin",
        description = "Number of total replications received in the last minute",
        type = DataSourceType.GAUGE
    )
    public long getNumOfReplicationsInLastMin() {
        return this.numberOfReplicationsLastMin.getCount();
    }

    @Monitor(
        name = "isBelowRenewThreshold",
        description = "0 = false, 1 = true",
        type = DataSourceType.GAUGE
    )
    public int isBelowRenewThresold() {
        return this.getNumOfRenewsInLastMin() <= (long)this.numberOfRenewsPerMinThreshold && this.startupTime > 0L && System.currentTimeMillis() > this.startupTime + (long)this.serverConfig.getWaitTimeInMsWhenSyncEmpty() ? 1 : 0;
    }

    public boolean isRegisterable(InstanceInfo instanceInfo) {
        DataCenterInfo datacenterInfo = instanceInfo.getDataCenterInfo();
        String serverRegion = this.clientConfig.getRegion();
        if (AmazonInfo.class.isInstance(datacenterInfo)) {
            AmazonInfo info = (AmazonInfo)AmazonInfo.class.cast(instanceInfo.getDataCenterInfo());
            String availabilityZone = info.get(MetaDataKey.availabilityZone);
            if (availabilityZone == null && "us-east-1".equalsIgnoreCase(serverRegion)) {
                return true;
            }

            if (availabilityZone != null && availabilityZone.contains(serverRegion)) {
                return true;
            }
        }

        return true;
    }

    private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();

        try {
            if (isReplication) {
                this.numberOfReplicationsLastMin.increment();
            }

            if (this.peerEurekaNodes != Collections.EMPTY_LIST && !isReplication) {
                Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

                while(var8.hasNext()) {
                    PeerEurekaNode node = (PeerEurekaNode)var8.next();
                    if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                    }
                }

                return;
            }
        } finally {
            tracer.stop();
        }

    }

    private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
        try {
            CurrentRequestVersion.set(Version.V2);
            InstanceInfo infoFromRegistry;
            switch(action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
            }
        } catch (Throwable var12) {
            logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var12});
        } finally {
            CurrentRequestVersion.remove();
        }

    }

    private void replicateASGInfoToReplicaNodes(String asgName, ASGStatus newStatus, PeerEurekaNode node) {
        CurrentRequestVersion.set(Version.V2);

        try {
            node.statusUpdate(asgName, newStatus);
        } catch (Throwable var8) {
            logger.error("Cannot replicate ASG status information to {}", node.getServiceUrl(), var8);
        } finally {
            CurrentRequestVersion.remove();
        }

    }

    @Monitor(
        name = "localRegistrySize",
        description = "Current registry size",
        type = DataSourceType.GAUGE
    )
    public long getLocalRegistrySize() {
        return super.getLocalRegistrySize();
    }

    public static enum Action {
        Heartbeat,
        Register,
        Cancel,
        StatusUpdate,
        DeleteStatusOverride;

        private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());

        private Action() {
        }

        public com.netflix.servo.monitor.Timer getTimer() {
            return this.timer;
        }
    }
}

openForTraffic开启任务postInit,进去之后发现剔除功能(剔除 没有续约的服务)

postInit,点进去,AbstractInstanceRegistry.class

  protected void postInit() {
        this.renewsLastMin.start();
        if (this.evictionTaskRef.get() != null) {
            ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
        }

        this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
        this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
    }

发现new EvictionTask()点进去

class EvictionTask extends TimerTask {
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);

    EvictionTask() {
    }

    public void run() {
        try {
            long compensationTimeMs = this.getCompensationTimeMs();
            AbstractInstanceRegistry.logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            AbstractInstanceRegistry.this.evict(compensationTimeMs);
        } catch (Throwable var3) {
            AbstractInstanceRegistry.logger.error("Could not run the evict task", var3);
        }

    }

    long getCompensationTimeMs() {
        long currNanos = this.getCurrentTimeNano();
        long lastNanos = this.lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0L) {
            return 0L;
        } else {
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - AbstractInstanceRegistry.this.serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0L ? 0L : compensationTime;
        }
    }

    long getCurrentTimeNano() {
        return System.nanoTime();
    }
}

看到run方法中,evict(compensationTimeMs),点进去就到了,具体剔除逻辑

public void evict() {
    this.evict(0L);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    if (!this.isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
    } else {
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
        Iterator var4 = this.registry.entrySet().iterator();

        while(true) {
            Map leaseMap;
            do {
                if (!var4.hasNext()) {
                    int registrySize = (int)this.getLocalRegistrySize();
                    int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());
                    int evictionLimit = registrySize - registrySizeThreshold;
                    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
                    if (toEvict > 0) {
                        logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit});
                        Random random = new Random(System.currentTimeMillis());

                        for(int i = 0; i < toEvict; ++i) {
                            int next = i + random.nextInt(expiredLeases.size() - i);
                            Collections.swap(expiredLeases, i, next);
                            Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
                            String appName = ((InstanceInfo)lease.getHolder()).getAppName();
                            String id = ((InstanceInfo)lease.getHolder()).getId();
                            EurekaMonitors.EXPIRED.increment();
                            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                            this.internalCancel(appName, id, false);
                        }
                    }

                    return;
                }

                Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
                leaseMap = (Map)groupEntry.getValue();
            } while(leaseMap == null);

            Iterator var7 = leaseMap.entrySet().iterator();

            while(var7.hasNext()) {
                Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
                Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
}

PeerAwareInstanceRegistry接口

在EurekaServerAutoConfiguration中 有 public EurekaServerContext eurekaServerContext

@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
}

查看DefaultEurekaServerContext

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.util.EurekaMonitors;
import com.netflix.eureka.util.ServoControl;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
    private final EurekaServerConfig serverConfig;
    private final ServerCodecs serverCodecs;
    private final PeerAwareInstanceRegistry registry;
    private final PeerEurekaNodes peerEurekaNodes;
    private final ApplicationInfoManager applicationInfoManager;

    @Inject
    public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
    }

    @PostConstruct
    public void initialize() {
        logger.info("Initializing ...");
        this.peerEurekaNodes.start();

        try {
            this.registry.init(this.peerEurekaNodes);
        } catch (Exception var2) {
            throw new RuntimeException(var2);
        }

        logger.info("Initialized");
    }

    @PreDestroy
    public void shutdown() {
        logger.info("Shutting down ...");
        this.registry.shutdown();
        this.peerEurekaNodes.shutdown();
        ServoControl.shutdown();
        EurekaMonitors.shutdown();
        logger.info("Shut down");
    }

    public EurekaServerConfig getServerConfig() {
        return this.serverConfig;
    }

    public PeerEurekaNodes getPeerEurekaNodes() {
        return this.peerEurekaNodes;
    }

    public ServerCodecs getServerCodecs() {
        return this.serverCodecs;
    }

    public PeerAwareInstanceRegistry getRegistry() {
        return this.registry;
    }

    public ApplicationInfoManager getApplicationInfoManager() {
        return this.applicationInfoManager;
    }
}

其中peerEurekaNodes.start();

启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息

registry.init(peerEurekaNodes);

鼠标放在registry上,发现是PeerAwareInstanceRegistryImpl , 的 注册信息管理类里面的init方法

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    this.initializedResponseCache();
    this.scheduleRenewalThresholdUpdateTask();
    this.initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable var3) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);
    }

}

PeerAwareInstanceRegistry是个接口,实现类是:PeerAwareInstanceRegistryImpl

PeerAwareInstanceRegistry接口,实现了com.netflix.eureka.registry.InstanceRegistry。  

服务实例注册表 

Server是围绕注册表管理的

有两个InstanceRegistry

  • com.netflix.eureka.registry.InstanceRegistry是euraka server中注册表管理的核心接口,职责是在内存中管理注册到Eureka Server中的服务实例信息
  • 实现类有PeerAwareInstanceRegistryImpl

org.springframework.cloud.netflix.eureka.server.InstanceRegistry对PeerAwareInstanceRegistryImpl进行了继承和扩展

使其适配Spring cloud的使用环境,主要的实现由PeerAwareInstanceRegistryImpl提供

com.netflix.eureka.registry.InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String>

  • LeaseManager<InstanceInfo> 是对注册到server中的服务实例租约进行管理
  • LookupService<String>           是提供服务实例的检索查询功能
  • LeaseManager<InstanceInfo> 接口的作用是对注册到Eureka Server中的服务实例租约进行管理

方法有

  • 服务注册
  • 下线
  • 续约
  • 剔除

此接口管理的类目前是InstanceInfo,InstanceInfo代表服务实例信息

PeerAwareInstanceRegistryImpl 增加了对peer节点的同步复制操作

使得eureka server集群中注册表信息保持一致

接受服务注册

我们学过Eureka Client在发起服务注册时会将自身的服务实例元数据封装在InstanceInfo中,然后将InstanceInfo发送到Eureka Server

Eureka Server在接收到Eureka Client发送的InstanceInfo后将会尝试将其放到本地注册表中以供其他Eureka Client进行服务发现

通过 eureka/apps/{服务名}注册 

  • 在EurekaServerAutoConfiguration中定义了 public FilterRegistrationBean jerseyFilterRegistration ,表明eureka-server使用了Jersey实现 对外的restful接口
  • 注册一个 Jersey 的 filter ,配置好相应的Filter 和 url映射
 @Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
    ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
    provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    Set<Class<?>> classes = new HashSet();
    String[] var5 = EUREKA_PACKAGES;
    int var6 = var5.length;

    for(int var7 = 0; var7 < var6; ++var7) {
        String basePackage = var5[var7];
        Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
        Iterator var10 = beans.iterator();

        while(var10.hasNext()) {
            BeanDefinition bd = (BeanDefinition)var10.next();
            Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
            classes.add(cls);
        }
    }

    Map<String, Object> propsAndFeatures = new HashMap();
    propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
    DefaultResourceConfig rc = new DefaultResourceConfig(classes);
    rc.setPropertiesAndFeatures(propsAndFeatures);
    return rc;
}

添加一些过滤器,类似于过滤请求地址,Path类似于@RequestMapping,Provider类似于@Controller   

在com.netflix.eureka.resources包下,是Eureka Server对于Eureka client的REST请求的定义

看ApplicationResource类(这是一类请求,应用类的请求),类似于应用@Controller注解:@Produces({"application/xml", "application/json"}),接受xml和json

见名识意 public Response addInstance,添加实例instanceinfo

方法中,有一句:

registry.register(info, "true".equals(isReplication))

鼠标放在registry上PeerAwareInstanceRegistry接口,点击void register方法

发现 是PeerAwareInstanceRegistryImpl 的方法:public void register(final InstanceInfo info, final boolean isReplication) ,中有一句:super.register(info, leaseDuration, isReplication);

com.netflix.eureka.registry.AbstractInstanceRegistry

register方法

在register中,服务实例的InstanceInfo保存在Lease中,Lease在AbstractInstanceRegistry中统一通过ConcurrentHashMap保存在内存中

  • 在服务注册过程中,会先获取一个读锁,防止其他线程对registry注册表进行数据操作,避免数据的不一致
  • 然后从resgitry查询对应的InstanceInfo租约是否已经存在注册表中,根据appName划分服务集群,使用InstanceId唯一标记服务实例
  • 如果租约存在,比较两个租约中的InstanceInfo的最后更新时间lastDirtyTimestamp,保留时间戳大的服务实例信息InstanceInfo
  • 如果租约不存在,意味这是一次全新的服务注册,将会进行自我保护的统计,创建新的租约保存InstanceInfo
  • 接着将租约放到resgitry注册表中
  • 之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态
  • 缓存操作包括将InstanceInfo加入用于统计Eureka Client增量式获取注册表信息的recentlyChangedQueue和失效responseCache中对应的缓存
  • 最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁并完成服务注册

接受心跳 续租,renew

在Eureka Client完成服务注册之后,它需要定时向Eureka Server发送心跳请求(默认30秒一次),维持自己在Eureka Server中租约的有效性

看另一类请求com.netflix.eureka.resources.InstanceResource

public Response renewLease()方法

@PUT
public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id);
        return Response.status(Status.NOT_FOUND).build();
    } else {
        Response response;
        if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) {
                this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }

        logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()});
        return response;
    }
}

看到一行boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

点击renew的实现

AbstractInstanceRegistry#renew方法

public boolean renew(String appName, String id, boolean isReplication) {
    EurekaMonitors.RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = (Lease)gMap.get(id);
    }

    if (leaseToRenew == null) {
        EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = (InstanceInfo)leaseToRenew.getHolder();
        if (instanceInfo != null) {
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}; re-register required", instanceInfo.getId());
                EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }

            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info("The instance status {} is different from overridden instance status {} for instance {}. Hence setting the status to overridden status", new Object[]{instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()});
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }

        this.renewsLastMin.increment();
        leaseToRenew.renew();
        return true;
    }
}

Eureka Server处理心跳请求的核心逻辑位于AbstractInstanceRegistry#renew方法中

renew方法是对Eureka Client位于注册表中的租约的续租操作,不像register方法需要服务实例信息,仅根据服务实例的服务名和服务实例id即可更新对应租约的有效时间

//根据appName获取服务集群的租约集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
//查看服务实例状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
    //统计每分钟续租次数
    renewsLastMin.increment();
    //更新租约
    leaseToRenew.renew();
}

此方法中不关注InstanceInfo,仅关注于租约本身以及租约的服务实例状态

  • 如果根据服务实例的appName和instanceInfoId查询出服务实例的租约
  • 并且根据#getOverriddenInstanceStatus方法得到的instanceStatus不为InstanceStatus.UNKNOWN
  • 那么更新租约中的有效时间,即更新租约Lease中的lastUpdateTimestamp
  • 达到续约的目的
  • 如果租约不存在,那么返回续租失败的结果

服务剔除

  • 如果Eureka Client在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因)
  • 那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈
  • 所以需要服务剔除此方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期租约剔除

剔除是定时任务,默认60秒执行一次,延时60秒,间隔60秒

protected void postInit() {
    this.renewsLastMin.start();
    if (this.evictionTaskRef.get() != null) {
        ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
    }

    this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
    this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
}

从上面eureka server启动来看,剔除的任务,是线程启动的,执行的是下面的方法。
com.netflix.eureka.registry.AbstractInstanceRegistry#evict

public void evict() {
    this.evict(0L);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
//   判断是否开启自我保护
if (!this.isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); } else { List<Lease<InstanceInfo>> expiredLeases = new ArrayList(); Iterator var4 = this.registry.entrySet().iterator(); while(true) { Map leaseMap; do { if (!var4.hasNext()) { int registrySize = (int)this.getLocalRegistrySize(); int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit}); Random random = new Random(System.currentTimeMillis()); for(int i = 0; i < toEvict; ++i) { int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i); String appName = ((InstanceInfo)lease.getHolder()).getAppName(); String id = ((InstanceInfo)lease.getHolder()).getId(); EurekaMonitors.EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); this.internalCancel(appName, id, false); } } return; } Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next(); leaseMap = (Map)groupEntry.getValue(); } while(leaseMap == null); Iterator var7 = leaseMap.entrySet().iterator(); while(var7.hasNext()) { Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next(); Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } }

如果开启自我保护,不剔除

点进去isLeaseExpirationEnabled,查看实现类

public boolean isLeaseExpirationEnabled() {
    if (!this.isSelfPreservationModeEnabled()) {
        return true;
    } else {
        return this.numberOfRenewsPerMinThreshold > 0 && this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
    }
}

有一个isSelfPreservationModeEnabled,点进去 

public boolean isSelfPreservationModeEnabled() {
    return this.serverConfig.shouldEnableSelfPreservation();
}

发现EurekaServerConfig,的方法shouldEnableSelfPreservation,看其实现中有EurekaServerConfigBean

public boolean shouldEnableSelfPreservation() {
    return this.enableSelfPreservation;
}

发现属性:enableSelfPreservation

回到com.netflix.eureka.registry.AbstractInstanceRegistry#evict方法

  • 紧接着一个大的for循环,便利注册表register
  • 依次判断租约是否过期
  • 一次性获取所有的过期租约
while(true) {
    Map leaseMap;
    do {
        if (!var4.hasNext()) {
            // 获取注册表租约总数    
            int registrySize = (int)this.getLocalRegistrySize();
            // 计算注册表租约的阈值总数 * 续租百分比,得出要续租的数量
            int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());
            // 总数减去要续租的数量,就是理论要剔除的数量     
            int evictionLimit = registrySize - registrySizeThreshold;
            // 求上面理论剔除数量,和过期租约总数的最小值, 就是最终要提出的数量
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit});
                Random random = new Random(System.currentTimeMillis());

                for(int i = 0; i < toEvict; ++i) {
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
                    String appName = ((InstanceInfo)lease.getHolder()).getAppName();
                    String id = ((InstanceInfo)lease.getHolder()).getId();
                    EurekaMonitors.EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    // 执行服务下线将服务从注册表清除掉
                    this.internalCancel(appName, id, false);
                }
            }

            return;
        }

        Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
        leaseMap = (Map)groupEntry.getValue();
    } while(leaseMap == null);

    Iterator var7 = leaseMap.entrySet().iterator();

    while(var7.hasNext()) {
        Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
        Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
            expiredLeases.add(lease);
        }
    }
}    

剔除的限制  

  • 自我保护期间不清除
  • 分批次清除

服务

  • 逐个随机剔除
  • 剔除均匀分布在所有应用中
  • 防止在同一时间内同一服务集群中的服务全部过期被剔除
  • 造成在大量剔除服务时,并在进行自我保护时,促使程序崩溃

剔除服务是个定时任务

查看EurekaServerInitializerConfiguration

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.eureka.EurekaServerConfig;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.netflix.eureka.server.event.EurekaRegistryAvailableEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaServerStartedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.web.context.ServletContextAware;

@Configuration(
    proxyBeanMethods = false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
    private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    private ServletContext servletContext;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private EurekaServerBootstrap eurekaServerBootstrap;
    private boolean running;
    private int order = 1;

    public EurekaServerInitializerConfiguration() {
    }

    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    public void start() {
        (new Thread(() -> {
            try {
                this.eurekaServerBootstrap.contextInitialized(this.servletContext);
                log.info("Started Eureka Server");
                this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));
                this.running = true;
                this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));
            } catch (Exception var2) {
                log.error("Could not initialize Eureka servlet context", var2);
            }

        })).start();
    }

    private EurekaServerConfig getEurekaServerConfig() {
        return this.eurekaServerConfig;
    }

    private void publish(ApplicationEvent event) {
        this.applicationContext.publishEvent(event);
    }

    public void stop() {
        this.running = false;
        this.eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        callback.run();
    }

    public int getOrder() {
        return this.order;
    }
}

eurekaServerBootstrap.contextInitialized(方法,中initEurekaServerContext();点进去this.registry.openForTraffic(this.applicationInfoManager, registryCount);点进去,super.postInit();点进去

protected void postInit() {
    this.renewsLastMin.start();
    if (this.evictionTaskRef.get() != null) {
        ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
    }

    this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
    this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask是定时任务

class EvictionTask extends TimerTask {
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);

    EvictionTask() {
    }

    public void run() {
        try {
            long compensationTimeMs = this.getCompensationTimeMs();
            AbstractInstanceRegistry.logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            AbstractInstanceRegistry.this.evict(compensationTimeMs);
        } catch (Throwable var3) {
            AbstractInstanceRegistry.logger.error("Could not run the evict task", var3);
        }

    }

    long getCompensationTimeMs() {
        long currNanos = this.getCurrentTimeNano();
        long lastNanos = this.lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0L) {
            return 0L;
        } else {
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - AbstractInstanceRegistry.this.serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0L ? 0L : compensationTime;
        }
    }

    long getCurrentTimeNano() {
        return System.nanoTime();
    }
}

剔除服务是个定时任务,用EvictionTask执行,默认60秒执行一次,延时60秒执行

定时剔除过期服务

  • 服务剔除将会遍历registry注册表,找出其中所有的过期租约
  • 然后根据配置文件中续租百分比阀值和当前注册表的租约总数量计算出最大允许的剔除租约的数量
  • (当前注册表中租约总数量减去当前注册表租约阀值)
  • 分批次剔除过期的服务实例租约
  • 对过期的服务实例租约调用AbstractInstanceRegistry#internalCancel服务下线的方法将其从注册表中清除掉​     

自我保护机制主要在Eureka Client和Eureka Server之间存在网络分区的情况下发挥保护作用

在服务器端和客户端都有对应实现

  • 假设在某种特定的情况下(如网络故障),Eureka Client和Eureka Server无法进行通信
  • 此时Eureka Client无法向Eureka Server发起注册和续约请求,Eureka Server中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险
  • 然而此时的Eureka Client可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除显然是不合理的

针对这种情况,Eureka设计了“自我保护机制”

  • 在Eureka Server处,如果出现大量的服务实例过期被剔除的现象
  • 那么该Server节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后再退出该模式
  • 在Eureka Client处,如果向Eureka Server注册失败,将快速超时并尝试与其他的Eureka Server进行通信
  • “自我保护机制”的设计大大提高了Eureka的可用性

服务下线

Eureka Client在应用销毁时

会向Eureka Server发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用

在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清除工作

在InstanceResource中

@DELETE
public Response cancelLease(@HeaderParam("x-netflix-discovery-replication") String isReplication) {
    try {
        boolean isSuccess = this.registry.cancel(this.app.getName(), this.id, "true".equals(isReplication));
        if (isSuccess) {
            logger.debug("Found (Cancel): {} - {}", this.app.getName(), this.id);
            return Response.ok().build();
        } else {
            logger.info("Not Found (Cancel): {} - {}", this.app.getName(), this.id);
            return Response.status(Status.NOT_FOUND).build();
        }
    } catch (Throwable var3) {
        logger.error("Error (cancel): {} - {}", new Object[]{this.app.getName(), this.id, var3});
        return Response.serverError().build();
    }
}

查看registry.cancel,实现该方法的类是PeerAwareInstanceRegistryImpl.class

public boolean cancel(String appName, String id, boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
        return true;
    } else {
        return false;
    }
}

点super.cancel进去发现internCancel

public boolean cancel(String appName, String id, boolean isReplication) {
    return this.internalCancel(appName, id, isReplication);
}

查看internCancel实现      

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 先获取读锁, 防止被其他线程修改
        this.read.lock();
        EurekaMonitors.CANCEL.increment(isReplication);
        //  根据appName获取服务实力集群
        Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        // 在内存中取消实例 id的服务
        if (gMap != null) {
            leaseToCancel = (Lease)gMap.remove(id);
        }
        // 添加到最近下线服务的统计队列
        this.recentCanceledQueue.add(new Pair(System.currentTimeMillis(), appName + "(" + id + ")"));
        InstanceStatus instanceStatus = (InstanceStatus)this.overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        // 往下判断leaseToCancel是否为空, 租约不存在返回false
         // 如果存在, 设置租约下线时间
        if (leaseToCancel == null) {
            EurekaMonitors.CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            boolean var16 = false;
            return var16;
        }

        leaseToCancel.cancel();
        InstanceInfo instanceInfo = (InstanceInfo)leaseToCancel.getHolder();
        String vip = null;
        String svip = null;
        if (instanceInfo != null) {
            // 获取持有租约的服务信息, 标记服务实例
            instanceInfo.setActionType(ActionType.DELETED);
            //   添加到租约变更记录队列 用于eureka client的增量拉取注册表信息。
            this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
            instanceInfo.setLastUpdatedTimestamp();
            vip = instanceInfo.getVIPAddress();
            svip = instanceInfo.getSecureVipAddress();
        }

        this.invalidateCache(appName, vip, svip);
        logger.info("Cancelled instance {}/{} (replication={})", new Object[]{appName, id, isReplication});
    } finally {
        // 释放锁
        this.read.unlock();
    }

    synchronized(this.lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            --this.expectedNumberOfClientsSendingRenews;
            this.updateRenewsPerMinThreshold();
        }

        return true;
    }
}
  • 首先通过registry根据服务名和服务实例id查询关于服务实例的租约Lease是否存在
  • 统计最近请求下线的服务实例用于Eureka Server主页展示
  • 如果租约不存在,返回下线失败
  • 如果租约存在,从registry注册表中移除,设置租约的下线时间
  • 同时在最近租约变更记录队列中添加新的下线记录,以用于Eureka Client的增量式获取注册表信息

集群同步

如果Eureka Server是通过集群的方式进行部署,那么为了维护整个集群中Eureka Server注册表数据的一致性,势必需要一个机制同步Eureka Server集群中的注册表数据

Eureka Server集群同步包含两个部分

  • 一部分是Eureka Server在启动过程中从它的peer节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中
  • 另一部分是Eureka Server每次对本地注册表进行操作时,同时会将操作同步到它的peer节点中,达到集群注册表数据统一的目的

启动拉取别的peer   

在Eureka Server启动类中

EurekaServerInitializerConfiguration位于EurekaServerAutoConfiguration 的import注解中。

eurekaServerBootstrap.contextInitialized

点进去:initEurekaServerContext()

 protected void initEurekaServerContext() throws Exception {
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    if (this.isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    EurekaServerContextHolder.initialize(this.serverContext);
    log.info("Initialized server context");
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}

看注释:拉取注册表从邻近节点。点击syncUp()的实现方法进去

public int syncUp() {
    int count = 0;

    for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
        if (i > 0) {
            try {
                Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException var10) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }

        Applications apps = this.eurekaClient.getApplications();
        Iterator var4 = apps.getRegisteredApplications().iterator();

        while(var4.hasNext()) {
            Application app = (Application)var4.next();
            Iterator var6 = app.getInstances().iterator();

            while(var6.hasNext()) {
                InstanceInfo instance = (InstanceInfo)var6.next();

                try {
                    if (this.isRegisterable(instance)) {
                        this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        ++count;
                    }
                } catch (Throwable var9) {
                    logger.error("During DS init copy", var9);
                }
            }
        }
    }

    return count;
}
  • 看循环:意思是,如果是i第一次进来,为0,不够等待的代码,直接执行下面的拉取服务实例
  • 将自己作为一个eureka client,拉取注册表
  • 并通过register(instance, instance.getLeaseInfo().getDurationInSecs(), true)注册到自身的注册表中

Eureka Server也是一个Eureka Client

  • 启动的时候也会进行DiscoveryClient的初始化,会从其对应的Eureka Server中拉取全量的注册表信息
  • 在Eureka Server集群部署的情况下,Eureka Server从它的peer节点中拉取到注册表信息后
  • 将遍历这个Applications,将所有的服务实例通过AbstractRegistry#register方法注册到自身注册表中

回到initEurekaServerContext

int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);        

当执行完上面的syncUp逻辑后,在下面的openForTraffic,开启此server接受别的client注册,拉取注册表等操作

而在它首次拉取其他peer节点时,是不允许client的通信请求的

openForTraffic

public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 初始化期望client发送过来的服务数量, 即上面获取到的服务数量
    this.expectedNumberOfClientsSendingRenews = count;
    this.updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }

    Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        this.primeAwsReplicas(applicationInfoManager);
    }

    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

updateRenewsPerMinThreshold点进去,是计算自我保护的统计参数

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfClientsSendingRenews * (60.0D / (double)this.serverConfig.getExpectedClientRenewalIntervalSeconds()) * this.serverConfig.getRenewalPercentThreshold());
}

服务数*(每个服务每分钟续约次数)* 阈值

回到openForTraffic

if (count > 0) {
    this.peerInstancesTransferEmptyOnStartup = false;
}
  • 如果count=0,没有拉取到注册表信息,将此值设为true,表示其他peer来取空的实例信息
  • 意味着,将不允许client从此server获取注册表信息
  • 如果count>0,将此值设置为false,允许client来获取注册表

后面将服务置为上线,并开启剔除的定时任务

  • 当Server的状态不为UP时,将拒绝所有的请求
  • 在Client请求获取注册表信息时,Server会判断此时是否允许获取注册表中的信息
  • 上述做法是为了避免Eureka Server在#syncUp方法中没有获取到任何服务实例信息时(Eureka Server集群部署的情况下)
  • Eureka Server注册表中的信息影响到Eureka Client缓存的注册表中的信息
  • 因为是全量同步,如果server什么也没同步过来,会导致client清空注册表,服务调用出问题

Server之间注册表信息的同步复制

为了保证Eureka Server集群运行时注册表信息的一致性,每个Eureka Server在对本地注册表进行管理操作时,会将相应的操作同步到所有peer节点中

在外部调用server的restful方法时,在com.netflix.eureka.resources包下的ApplicationResource资源中,查看每个服务的操作

在PeerAwareInstanceRegistryImpl类中,看其他操作,cancel,renew等中都有replicateToPeers,

此方法中有个peerEurekaNodes,代表一个可同步数据的eureka Server的集合,如果注册表有变化,向此中的peer节点同步

replicateToPeers方法

private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();

    try {
        if (isReplication) {
            this.numberOfReplicationsLastMin.increment();
        }

        if (this.peerEurekaNodes != Collections.EMPTY_LIST && !isReplication) {
            Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

            while(var8.hasNext()) {
                PeerEurekaNode node = (PeerEurekaNode)var8.next();
                if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            }

            return;
        }
    } finally {
        tracer.stop();
    }

}

它将遍历Eureka Server中peer节点,向每个peer节点发送同步请求

此replicateInstanceActionsToPeers方法

private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        CurrentRequestVersion.set(Version.V2);
        InstanceInfo infoFromRegistry;
        switch(action) {
        case Cancel:
            node.cancel(appName, id);
            break;
        case Heartbeat:
            InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
            break;
        case Register:
            node.register(info);
            break;
        case StatusUpdate:
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.statusUpdate(appName, id, newStatus, infoFromRegistry);
            break;
        case DeleteStatusOverride:
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.deleteStatusOverride(appName, id, infoFromRegistry);
        }
    } catch (Throwable var12) {
        logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var12});
    } finally {
        CurrentRequestVersion.remove();
    }

}

类PeerEurekaNode的实例node的各种方法,cancel,register

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka.cluster;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.shared.transport.EurekaHttpResponse;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action;
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.netflix.eureka.util.batcher.TaskDispatcher;
import com.netflix.eureka.util.batcher.TaskDispatchers;
import java.net.MalformedURLException;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerEurekaNode {
    private static final long RETRY_SLEEP_TIME_MS = 100L;
    private static final long SERVER_UNAVAILABLE_SLEEP_TIME_MS = 1000L;
    private static final long MAX_BATCHING_DELAY_MS = 500L;
    private static final int BATCH_SIZE = 250;
    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNode.class);
    public static final String BATCH_URL_PATH = "peerreplication/batch/";
    public static final String HEADER_REPLICATION = "x-netflix-discovery-replication";
    private final String serviceUrl;
    private final EurekaServerConfig config;
    private final long maxProcessingDelayMs;
    private final PeerAwareInstanceRegistry registry;
    private final String targetHost;
    private final HttpReplicationClient replicationClient;
    private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
    private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;

    public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
        this(registry, targetHost, serviceUrl, replicationClient, config, 250, 500L, 100L, 1000L);
    }

    PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
        this.registry = registry;
        this.targetHost = targetHost;
        this.replicationClient = replicationClient;
        this.serviceUrl = serviceUrl;
        this.config = config;
        this.maxProcessingDelayMs = (long)config.getMaxTimeForReplication();
        String batcherName = this.getBatcherName();
        ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
        this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor);
        this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor);
    }

    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + (long)getLeaseRenewalOf(info);
        this.batchingDispatcher.process(taskId("register", info), new InstanceReplicationTask(this.targetHost, Action.Register, info, (InstanceStatus)null, true) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.register(info);
            }
        }, expiryTime);
    }

    public void cancel(final String appName, final String id) throws Exception {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("cancel", appName, id), new InstanceReplicationTask(this.targetHost, Action.Cancel, appName, id) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.cancel(appName, id);
            }

            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
                    PeerEurekaNode.logger.warn("{}: missing entry.", this.getTaskName());
                }

            }
        }, expiryTime);
    }

    public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable {
        if (primeConnection) {
            this.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        } else {
            ReplicationTask replicationTask = new InstanceReplicationTask(this.targetHost, Action.Heartbeat, info, overriddenStatus, false) {
                public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                    return PeerEurekaNode.this.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
                }

                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        PeerEurekaNode.logger.warn("{}: missing entry.", this.getTaskName());
                        if (info != null) {
                            PeerEurekaNode.logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", new Object[]{this.getTaskName(), info.getId(), info.getStatus()});
                            PeerEurekaNode.this.register(info);
                        }
                    } else if (PeerEurekaNode.this.config.shouldSyncWhenTimestampDiffers()) {
                        InstanceInfo peerInstanceInfo = (InstanceInfo)responseEntity;
                        if (peerInstanceInfo != null) {
                            PeerEurekaNode.this.syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                        }
                    }

                }
            };
            long expiryTime = System.currentTimeMillis() + (long)getLeaseRenewalOf(info);
            this.batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
        }
    }

    public void statusUpdate(final String asgName, final ASGStatus newStatus) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.nonBatchingDispatcher.process(asgName, new AsgReplicationTask(this.targetHost, Action.StatusUpdate, asgName, newStatus) {
            public EurekaHttpResponse<?> execute() {
                return PeerEurekaNode.this.replicationClient.statusUpdate(asgName, newStatus);
            }
        }, expiryTime);
    }

    public void statusUpdate(final String appName, final String id, final InstanceStatus newStatus, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("statusUpdate", appName, id), new InstanceReplicationTask(this.targetHost, Action.StatusUpdate, info, (InstanceStatus)null, false) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.statusUpdate(appName, id, newStatus, info);
            }
        }, expiryTime);
    }

    public void deleteStatusOverride(final String appName, final String id, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("deleteStatusOverride", appName, id), new InstanceReplicationTask(this.targetHost, Action.DeleteStatusOverride, info, (InstanceStatus)null, false) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.deleteStatusOverride(appName, id, info);
            }
        }, expiryTime);
    }

    public String getServiceUrl() {
        return this.serviceUrl;
    }

    public int hashCode() {
        int prime = true;
        int result = 1;
        int result = 31 * result + (this.serviceUrl == null ? 0 : this.serviceUrl.hashCode());
        return result;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        } else if (obj == null) {
            return false;
        } else if (this.getClass() != obj.getClass()) {
            return false;
        } else {
            PeerEurekaNode other = (PeerEurekaNode)obj;
            if (this.serviceUrl == null) {
                if (other.serviceUrl != null) {
                    return false;
                }
            } else if (!this.serviceUrl.equals(other.serviceUrl)) {
                return false;
            }

            return true;
        }
    }

    public void shutDown() {
        this.batchingDispatcher.shutdown();
        this.nonBatchingDispatcher.shutdown();
        this.replicationClient.shutdown();
    }

    private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
        try {
            if (infoFromPeer != null) {
                logger.warn("Peer wants us to take the instance information from it, since the timestamp differs,Id : {} My Timestamp : {}, Peer's timestamp: {}", new Object[]{id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp()});
                if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                    logger.warn("Overridden Status info -id {}, mine {}, peer's {}", new Object[]{id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus()});
                    this.registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
                }

                this.registry.register(infoFromPeer, true);
            }
        } catch (Throwable var6) {
            logger.warn("Exception when trying to set information from peer :", var6);
        }

    }

    public String getBatcherName() {
        String batcherName;
        try {
            batcherName = (new URL(this.serviceUrl)).getHost();
        } catch (MalformedURLException var3) {
            batcherName = this.serviceUrl;
        }

        return "target_" + batcherName;
    }

    private static String taskId(String requestType, String appName, String id) {
        return requestType + '#' + appName + '/' + id;
    }

    private static String taskId(String requestType, InstanceInfo info) {
        return taskId(requestType, info.getAppName(), info.getId());
    }

    private static int getLeaseRenewalOf(InstanceInfo info) {
        return (info.getLeaseInfo() == null ? 90 : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
    }
}
  • 用了batchingDispatcher.process
  • 作用是将同一时间段内,相同服务实例的相同操作将使用相同的任务编号
  • 在进行同步复制的时候,将根据任务编号合并操作,减少同步操作的数量和网络消耗
  • 但是同时也造成了同步复制的延时性,不满足CAP中的C(强一致性)
  • 所以Eureka,只满足AP

通过Eureka Server在启动过程中初始化本地注册表信息和Eureka Server集群间的同步复制操作,最终达到了集群中Eureka Server注册表信息一致的目的

获取注册表中服务实例信息

Eureka Server中获取注册表的服务实例信息主要通过两个方法实现

  • AbstractInstanceRegistry#getApplicationsFromMultipleRegions从多地区获取全量注册表数据
  • AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions从多地区获取增量式注册表数据。

全量

上面讲到从节点复制注册信息的时候,用方法public int syncUp()

Applications apps = eurekaClient.getApplications(); 实现类

public Applications getApplications() {
    boolean disableTransparentFallback = this.serverConfig.disableTransparentFallbackToOtherRegion();
    return disableTransparentFallback ? this.getApplicationsFromLocalRegionOnly() : this.getApplicationsFromAllRemoteRegions();
}

有一行getApplicationsFromAllRemoteRegions()

getApplicationsFromMultipleRegions

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}", includeRemoteRegion, remoteRegions);
    if (includeRemoteRegion) {
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
    } else {
        EurekaMonitors.GET_ALL_CACHE_MISS.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(1L);
    Iterator var4 = this.registry.entrySet().iterator();

    while(var4.hasNext()) {
        Entry<String, Map<String, Lease<InstanceInfo>>> entry = (Entry)var4.next();
        Application app = null;
        Lease lease;
        if (entry.getValue() != null) {
            for(Iterator var7 = ((Map)entry.getValue()).entrySet().iterator(); var7.hasNext(); app.addInstance(this.decorateInstanceInfo(lease))) {
                Entry<String, Lease<InstanceInfo>> stringLeaseEntry = (Entry)var7.next();
                lease = (Lease)stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(((InstanceInfo)lease.getHolder()).getAppName());
                }
            }
        }

        if (app != null) {
            apps.addApplication(app);
        }
    }

    if (includeRemoteRegion) {
        String[] var15 = remoteRegions;
        int var16 = remoteRegions.length;

        label69:
        for(int var17 = 0; var17 < var16; ++var17) {
            String remoteRegion = var15[var17];
            RemoteRegionRegistry remoteRegistry = (RemoteRegionRegistry)this.regionNameVSRemoteRegistry.get(remoteRegion);
            if (null == remoteRegistry) {
                logger.warn("No remote registry available for the remote region {}", remoteRegion);
            } else {
                Applications remoteApps = remoteRegistry.getApplications();
                Iterator var10 = remoteApps.getRegisteredApplications().iterator();

                while(true) {
                    while(true) {
                        if (!var10.hasNext()) {
                            continue label69;
                        }

                        Application application = (Application)var10.next();
                        if (this.shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}", application.getName(), remoteRegion);
                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }

                            Iterator var13 = application.getInstances().iterator();

                            while(var13.hasNext()) {
                                InstanceInfo instanceInfo = (InstanceInfo)var13.next();
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a whitelist and this app is not in the whitelist.", application.getName(), remoteRegion);
                        }
                    }
                }
            }
        }
    }

    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}
  • 作用从多个地区中获取全量注册表信息,并封装成Applications返回
  • 它首先会将本地注册表registry中的所有服务实例信息提取出来封装到Applications中
  • 再根据是否需要拉取Region的注册信息,将远程拉取过来的Application放到上面的Applications中
  • 最后得到一个全量的Applications

增量

在前面提到接受服务注册,接受心跳等方法中,都有recentlyChangedQueue.add(new RecentlyChangedItem(lease));

作用是将新变动的服务放到最近变化的服务实例信息队列中,用于记录增量是注册表信息

getApplicationDeltasFromMultipleRegions,实现了从远处eureka server中获取增量式注册表信息的能力

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = this.allKnownRemoteRegions;
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;
    if (includeRemoteRegion) {
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        EurekaMonitors.GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(this.responseCache.getVersionDeltaWithRegions().get());
    HashMap applicationInstancesMap = new HashMap();

    Applications var23;
    try {
        this.write.lock();
        Iterator<AbstractInstanceRegistry.RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());

        Lease lease;
        Application app;
        for(; iter.hasNext(); app.addInstance(new InstanceInfo(this.decorateInstanceInfo(lease)))) {
            lease = ((AbstractInstanceRegistry.RecentlyChangedItem)iter.next()).getLeaseInfo();
            InstanceInfo instanceInfo = (InstanceInfo)lease.getHolder();
            logger.debug("The instance id {} is found with status {} and actiontype {}", new Object[]{instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()});
            app = (Application)applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
        }

        if (includeRemoteRegion) {
            String[] var20 = remoteRegions;
            int var22 = remoteRegions.length;

            label155:
            for(int var24 = 0; var24 < var22; ++var24) {
                String remoteRegion = var20[var24];
                RemoteRegionRegistry remoteRegistry = (RemoteRegionRegistry)this.regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        Iterator var12 = remoteAppsDelta.getRegisteredApplications().iterator();

                        while(true) {
                            Application application;
                            do {
                                if (!var12.hasNext()) {
                                    continue label155;
                                }

                                application = (Application)var12.next();
                            } while(!this.shouldFetchFromRemoteRegistry(application.getName(), remoteRegion));

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }

                            Iterator var15 = application.getInstances().iterator();

                            while(var15.hasNext()) {
                                InstanceInfo instanceInfo = (InstanceInfo)var15.next();
                                appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = this.getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        var23 = apps;
    } finally {
        this.write.unlock();
    }

    return var23;
}

在EurekaServer对外restful中,在com.netflix.eureka.resources下

@Path("{appId}")
public ApplicationResource getApplicationResource(@PathParam("version") String version, @PathParam("appId") String appId) {
    CurrentRequestVersion.set(Version.toEnum(version));

    ApplicationResource var3;
    try {
        var3 = new ApplicationResource(appId, this.serverConfig, this.registry);
    } finally {
        CurrentRequestVersion.remove();
    }

    return var3;
}
@GET
public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("X-Eureka-Accept") String eurekaAccept) {
    if (!this.registry.shouldAllowAccess(false)) {
        return Response.status(Status.FORBIDDEN).build();
    } else {
        EurekaMonitors.GET_APPLICATION.increment();
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = KeyType.XML;
        }

        Key cacheKey = new Key(EntityType.Application, this.appName, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept));
        String payLoad = this.responseCache.get(cacheKey);
        CurrentRequestVersion.remove();
        if (payLoad != null) {
            logger.debug("Found: {}", this.appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", this.appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }
}

其中有一句:

String payLoad = responseCache.get(cacheKey);

查看responseCache其实现类的构造函数

点进去有一句:registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));从远程获取delta增量注册信息

private ResponseCacheImpl.Value generatePayload(Key key) {
    Stopwatch tracer = null;

    ResponseCacheImpl.Value var8;
    try {
        String payload;
        switch(key.getEntityType()) {
        case Application:
            boolean isRemoteRegionRequested = key.hasRegions();
            if ("ALL_APPS".equals(key.getName())) {
                if (isRemoteRegionRequested) {
                    tracer = this.serializeAllAppsWithRemoteRegionTimer.start();
                    payload = this.getPayLoad(key, this.registry.getApplicationsFromMultipleRegions(key.getRegions()));
                } else {
                    tracer = this.serializeAllAppsTimer.start();
                    payload = this.getPayLoad(key, this.registry.getApplications());
                }
            } else if ("ALL_APPS_DELTA".equals(key.getName())) {
                if (isRemoteRegionRequested) {
                    tracer = this.serializeDeltaAppsWithRemoteRegionTimer.start();
                    this.versionDeltaWithRegions.incrementAndGet();
                    versionDeltaWithRegionsLegacy.incrementAndGet();
                    payload = this.getPayLoad(key, this.registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                } else {
                    tracer = this.serializeDeltaAppsTimer.start();
                    this.versionDelta.incrementAndGet();
                    versionDeltaLegacy.incrementAndGet();
                    payload = this.getPayLoad(key, this.registry.getApplicationDeltas());
                }
            } else {
                tracer = this.serializeOneApptimer.start();
                payload = this.getPayLoad(key, this.registry.getApplication(key.getName()));
            }
            break;
        case VIP:
        case SVIP:
            tracer = this.serializeViptimer.start();
            payload = this.getPayLoad(key, getApplicationsForVip(key, this.registry));
            break;
        default:
            logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
            payload = "";
        }

        var8 = new ResponseCacheImpl.Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }

    }

    return var8;
}

但是这个只是向client提供,不向server提供,因为server可以通过每次变更自动同步到peer

获取增量式注册表信息将会从recentlyChangedQueue中获取最近变化的服务实例信息

recentlyChangedQueue中统计了近3分钟内进行注册、修改和剔除的服务实例信息

在服务注册AbstractInstanceRegistry#registry

接受心跳请求AbstractInstanceRegistry#renew

服务下线AbstractInstanceRegistry#internalCancel等方法中均可见到recentlyChangedQueue对这些服务实例进行登记,用于记录增量式注册表信息

#getApplicationsFromMultipleRegions方法同样提供了从远程Region的Eureka Server获取增量式注册表信息的能力

论读书
睁开眼,书在面前
闭上眼,书在心里
原文地址:https://www.cnblogs.com/YC-L/p/14479888.html