SpringCloud组件Eureka入门解析

前言

微服务需要将大的系统拆分成众多的微服务,每个微服务为了提高高可用性往往又会部署多个节点,所以就会导致微服务实例个数很多,而且各个服务之间又可能会相互调用。所以需要有一个统一管理微服务实例的模块用于注册服务和发现服务。

服务注册指服务提供者实例将自己注册到注册中心,将自身的服务名和IP地址上报给注册中心。

服务发现指服务消费者从注册中心获取指定服务的节点信息

注册中心除了需要实现服务注册和服务发现之外还需要提高服务健康检查的工作,当服务异常断开时需要及时将服务删除,避免服务消费者将请求发送到了不可用的服务提供者,从而导致服务不可用的情况。

一、Eureka简介

Eureka是Netfilx开源的一款用于服务注册和服务发现的组件,Eureka作为一个注册中心,需要满足一个注册基本功能,包括服务注册、服务发现、服务健康检测、服务更新推送等。

Eureka的角色主要分为Eureka服务器和Eureka客户端,而Eureka客户端又可以分成服务提供者和服务消费者。

Eureka服务端主要为Eureka客户端提供了如下功能:

1.1、服务注册(Register)

Eureka客户端向Erueka服务发送自身的元数据,包括IP地址、服务名称、端口号、运行状态等信息。

1.2、服务续约(Renew)

Eureka客户端默认会30秒发送一次心跳给Eureka服务器来进行服务续约,如果Eureka服务器在90秒内没有收到Eureka客户端的心跳就会任务该客户端出现故障,就会从已注册的实例列表中剔除。

1.3、服务发现(Fetch Registies)

Eureka客户端从Eureka服务器获取注册的服务信息,并将服务节点信息缓存在本地。如果注册列表信息更新,会定时30秒更新一次,Eureka客户端需要同步更新本地的注册信息缓存。默认采用JSON格式进行传输

1.4、服务下线(Cancel)

Eureka客户端关闭时可以向Eureka服务器发送下线请求,那么Eureka服务器就会从注册列表中将该Erueka客户端信息删除

1.5、服务剔除(Eviction)

当Eureka客户端90秒内没有向Eureka服务器发送心跳信息,Eureka服务器就会认为该客户端不可用,从而将该实例信息从注册列表中剔除

tips:

1、Eureka Client的注册延迟机制,Eureka客户端启动时并非立即向Eureka服务器注册,而是会默认延迟40秒才会进行注册

2、Eureka Server的缓存机制,Eureka服务器维护客户端实例信息时会采用缓存,默认每30秒更新一次

3、由于Eureka Client的延迟注册和Eureka Server的缓存机制,所以某个服务提供者从启动到被发现最长可能会在70秒时才被发现

4、Eureka Server自我保护机制,如果Eureka Server在默认15分钟内接收Eureka Client的心跳次数低于85%,那么Eureka Server就认为可能是自身网络故障,此时不会将无法续约的客户端剔除,避免由于自身原因将正常的客户端剔除。

二、Eureka实践

2.1、Eureka服务器

1.新建eureka-server项目,添加maven依赖

   <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka-server</artifactId>
            <version>1.4.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.添加配置文件application.yml

server:
  port: 8761
eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

默认Eureka服务器会向自己注册,配置registerWithEureka为false和fetchRegistry为false表示不向自己注册并且不需要发现服务。

3.添加springboot项目启动类EurekaServerBootstrap

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {

    public static void main(String[] args){
        SpringApplication.run(EurekaServerApplication.class);
        System.out.println("Eureka Server starting...");
    }
}

添加@EnableEurekaServer注解表示当前为Eureka服务器,直接启动项目执行main方法,Eureka服务器就启动了,整个过程非常简单。

访问Eureka管理后台查看 http://localhost:8761/ 结果如下:

其他System Status表示系统状态;DS Replicas 表示已经注册到Eureka的应用信息;General Info表示服务器运行信息;Instance Info表示实例信息

2.2、Eureka客户端

1.新建项目order,添加Eureka相关依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
            <version>1.4.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.添加配置文件application.yml

server:
  port: 8083
spring:
  application:
    name: order
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

其他spring.application.name表示应用的名称,eureka.client.serviceUrl表示Eureka服务器地址

3.添加项目启动类OrderApplication,并添加@EnableEurekaClient注解表示当前项目是Eureka客户端

@SpringBootApplication
@EnableEurekaClient
public class OrderApplication {

    public static void main(String[] args){
        SpringApplication.run(OrderApplication.class);
        System.out.println("OrderApplication starting...");
    }
}

启动项目,可以发现Eureka控制台可以查看到Order应用信息和客户端实例信息

总结:整体而已Eureka使用比较简单,只需要maven添加Eureka服务器和Eureka客户端的相关依赖,然后application.yml添加Eureka配置,启动类服务器和客户端分别添加@EnableEurekaServer和@EnableEurekaClient注解即可。

三、Eureka实现原理

3.1、Eureka Client 实现原理

了解Eureka客户端实现原理之前,首先要知道Eureka Client有哪些功能,包括如下功能:1、向Eureka Server注册服务;2、向Eureka Server发送心跳;3、向Eureka Server订阅服务;

而Eureka Client使用时只有@EnableEurekaClient一个注解,所以@EnableEurekaClient是Eureka客户端的实现核心。

@EnableEurekaClient注解的作用是创建一个EurekaClient接口的实现类DiscoveryClient对象,DiscoveryClient对象就代表了一个Eureka客户端。DiscoveryClient初始化方法核心逻辑如下:

/** DiscoveryClient 构造函数 */
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {

        /** 1.初始化配置信息*/
        //......
        /** 2.是否需要订阅服务,默认为true*/
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        /** 3.是否需要注册到Eureka服务器,默认为true*/
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        try {
            /** 4.初始化*/
            scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            /** 5.初始化心跳线程池 */
            heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()
            );

            /** 6.初始化刷新本地缓存线程池 */
            cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()
            );
            //......
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            /** 从Eureka服务器订阅服务*/
            fetchRegistryFromBackup();
        }

        //......

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                /** 注册到Eureka服务器*/
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        /** 初始化定时任务*/
        initScheduledTasks();

        try {
            //注册监控
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }

初始化方法主要是初始化客户端配置属性,然后初始化了多个线程池分别如下:

/** 定时任务*/ 
private final ScheduledExecutorService scheduler;
/** 用于发送心跳包线程池*/
private final ThreadPoolExecutor heartbeatExecutor;
/** 用于刷新本地缓存线程池*/
private final ThreadPoolExecutor cacheRefreshExecutor;

最后执行initScheduleTasks初始化定时任务,initScheduleTasks方法源码如下:

 1 /** DiscoveryClient 初始化定时任务 */
 2     private void initScheduledTasks() {
 3         /** 1.判断是否需要从Eureka服务器获取注册的实例,默认为true,Eureka服务器不需要,Eureka客户端需要*/
 4         if (clientConfig.shouldFetchRegistry()) {
 5             /** 1.1.从Eureka服务器拉取注册信息时间间隔,默认是30秒*/
 6             int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 7             /** 1.2.回退刷新属性,重试延迟的最大倍数值,默认是10*/
 8             int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 9             /** 1.3.开启一个30秒执行一次的CacheRefreshThread定时任务*/
10             scheduler.schedule(
11                     new TimedSupervisorTask(
12                             "cacheRefresh",
13                             scheduler,
14                             cacheRefreshExecutor,
15                             registryFetchIntervalSeconds,
16                             TimeUnit.SECONDS,
17                             expBackOffBound,
18                             new CacheRefreshThread()
19                     ),
20                     registryFetchIntervalSeconds, TimeUnit.SECONDS);
21         }
22         /** 2.判断是否需要注册到Eureka服务器,默认为true,Eureka客户端需要;Eureka服务器不需要*/
23         if (clientConfig.shouldRegisterWithEureka()) {
24             /** 2.1.发送心跳间隔,默认是30秒*/
25             int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
26             /** 2.2.发送心跳回退相关属性,重试延迟最大倍数,默认是10*/
27             int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
28             logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
29             /** 2.3.开启心跳定时任务,默认30秒执行一次HeartbeatThread*/
30             scheduler.schedule(
31                     new TimedSupervisorTask(
32                             "heartbeat",
33                             scheduler,
34                             heartbeatExecutor,
35                             renewalIntervalInSecs,
36                             TimeUnit.SECONDS,
37                             expBackOffBound,
38                             new HeartbeatThread()
39                     ),
40                     renewalIntervalInSecs, TimeUnit.SECONDS);
41 
42             // 初始化当前实例信息
43             instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
44 
45             // 初始化状态变化监听器
46             statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
47                 @Override
48                 public String getId() {
49                     return "statusChangeListener";
50                 }
51 
52                 @Override
53                 public void notify(StatusChangeEvent statusChangeEvent) {
54                     if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
55                             InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
56                         // log at warn level if DOWN was involved
57                         logger.warn("Saw local status change event {}", statusChangeEvent);
58                     } else {
59                         logger.info("Saw local status change event {}", statusChangeEvent);
60                     }
61                     instanceInfoReplicator.onDemandUpdate();
62                 }
63             };
64 
65             /** 注册状态变化监听器*/
66             if (clientConfig.shouldOnDemandUpdateStatusChange()) {
67                 applicationInfoManager.registerStatusChangeListener(statusChangeListener);
68             }
69 
70             /** 延迟初始化当前实例,默认延迟启动40秒*/
71             instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
72         } else {
73             logger.info("Not registering with Eureka server per configuration");
74         }
75     }

代码很多但是逻辑不多,核心逻辑如下:

1、判断是否配置了fetchRegistry用于是否从Eureka服务器获取注册信息,然后使用CacheRefreshThread线程用于刷新本地缓存的注册信息;

2、判断是否配置了registerWithEureka用于是否将当前实例注册到Eureka服务器,然后使用HeartbeatThread线程用于向Eureka服务器发送心跳数据;

3、调用InstanceInfoReplicator的start方法用于注册当前实例到Eureka服务器

所以可以得出客户端的核心功能,订阅服务是通过CacheRefreshThread线程任务实现;心跳是通过HeartbeatThread线程任务实现,注册服务是通过InstanceInfoReplicator对象的start方法实现;

CacheRefreshThread线程实际就是调用Eureka服务器的/apps接口用于获取注册实例信息,并将注册实例信息缓存在本地;缓存信息包括所有应用信息Application,Application包括所有实例InstanceInfo信息;

HeartbeatThread线程实际就是调用Eureka服务器的/apps/${appName}/${instanceId}?status=${status}接口用于服务续约, 其中appName是应用名称,instanceId为ip:应用名称:port

而实际的注册会在延迟40秒启动InstanceInfoReplicator的run方法,实际调用DiscoveryClient的register方法,最终用POST请求方式调用Eureka服务器的/apps/${appName}接口用于服务注册

3.2、Eureka Server 实现原理

作为Eureka服务端需要完成服务订阅、服务注册、处理客户端心跳等功能,并且还需要存储注册的服务实例信息。

Eureka Server实现比较简单,只需要在启动类添加@EnableEurekaServer注解即可,所以可以确定Eureka服务器的核心功能都是通过该注解来实现,那么先看该注解的定义,定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

该注解的作用仅仅是引入了EurekaServerMarkerConfiguration类的实例,而EurekaServerMarkerConfiguration类的定义如下:

@Configuration
public class EurekaServerMarkerConfiguration {

    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }

    class Marker {
    }
}

 可以发现该类的作用就是定义了一个Marker对象,而Marker对象没有任何属性和方法,仅仅是用于标记的作用,也就是说如果Spring容器了加载了Marker实例就表示当前应用是Eureka服务器。该标记用于EurekaServerAutoConfiguration类中,该类定义如下:

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter

 也就是说如果Spring容器中存在Marker对象,那么就会加载EurekaServerAutoConfiguration对象,从而触发Eureka Server相关的自动配置工作。另外该类中还import了EurekaServerInitializerConfiguration类,该类就完成了Eureka服务器的初始化工作。

EurekaServerInitializerConfiguration类实现了SmartLifecycle接口,所以当Spring容器启动后会执行start方法,逻辑如下:

public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    /** Eureka服务器上下文初始化 */
                    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                    log.info("Started Eureka Server");

                    /** 发布Eureka服务器启动事件*/
                    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    EurekaServerInitializerConfiguration.this.running = true;
                    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                    // Help!
                    log.error("Could not initialize Eureka servlet context", ex);
                }
            }
        }).start();
    }

    public void contextInitialized(ServletContext context) {
        try {
            /** 初始化Eureka服务器环境 */
            initEurekaEnvironment();
            /** 初始化服务器上下文*/
            initEurekaServerContext();
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

核心逻辑是执行了initEurekaEnvironment方法初始化Eureka服务器环境和秩序initEurekaServerContext方法初始化Eureka服务器上下文

Eureka服务器使用了Jersey框架,提供了RESTFul风格的API,所以提供的接口都位于com.netfilx.eureka.resources包中。

服务注册接口位于ApplicationResource中,方法为addInstance方法,最终调用了PeerAwareInstanceRegistry的register方法进行实例注册,代码如下:

/** 服务注册*/
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        /** 调用父类注册方法进行实例注册 */
        super.register(info, leaseDuration, isReplication);
        /** 将注册信息广播给其他Eureka节点 */
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

首先是调用父类AbstractInstanceRegisstry的注册方法进行注册,然后调用replicateToPeers将新加入的实例广播通知给其他Eureka节点。

广播的逻辑比较简单,就是遍历所有Eureka服务器节点,依次调用服务注册方法即可。其他功能的广播逻辑类似,都是遍历调用其他Eureka服务器节点的对应接口实现广播效果。

而具体的注册是调用父类AbstractInstanceRegistry的register方法进行实例注册,父类的register方法逻辑如下:

/**
     * AbstractInstanceRegistry类: 注册服务实例方法
     * @param registrant: 服务实例
     * @param leaseDuration: 最大空闲时间
     * @param isReplication: 是否为副本节点
     * */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            /** 1.添加读锁*/
            read.lock();
            //根据应用名称获取所有服务实例信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);//注册数量自增
            if (gMap == null) {//如果应用集合为空则初始化
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            /** 2.根据服务实例ID查询是否存在*/
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                /** 如果实例更新时间小于已经存在的实例更新时间,那么就注册已经存在的实例 */
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                /** 3.如果服务实例不存在,那么就注册新服务实例*/
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            /** 将注册的服务实例封装成Lease对象 */
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            /** 通过gMap缓存服务实例信息 */
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                        + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }

            /** 实例覆盖状态*/
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            /** 更新服务实例启动的up时间戳*/
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

AbstractInstanceRegistry类内部有一个Map用于存储服务实例信息,定义如下:

/** <应用名称, <服务实例ID, 服务实例>>*/
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry  = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

当服务注册成功后,就通过registry来缓存服务实例信息,服务下线和续约等接口也就是更新集合中服务实例的状态。

那么还有一个问题,但是Eureka客户端没有及时发送心跳续约时,服务器是如何将这些客户端下线的呢?猜想肯定有一个定时任务来处理这个逻辑。

在EurekaServerBootstrap的initEurekaServerContext方法中,调用了registry的openForTraffic方法,实际执行了实现类PeerAwareInstanceRegistryImpl的openForTraffic方法,内部调用父类AbstractRegistry的postInit方法,方法如下:

protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        /** 初始化过期清除任务EvictionTask */
        evictionTaskRef.set(new EvictionTask());
        /** 提交定时任务*/
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

postInit方法中创建了EvictionTask清除过期实例任务,然后提交给定时器evictionTimer,间隔时间为配置的evivtionIntervalTimerInMs的值,默认是60 * 1000毫秒也就是一分钟。EvictionTask执行逻辑如下:

class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                /** 执行evict方法*/
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
    }

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        /** 遍历所有注册的服务实例*/
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    /** 判断服务实例是否已经过期,根据服务实例上一次心跳时间+最大空闲时间和当前时间进行比较*/
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);//将已经过期的实例添加到列表中
                    }
                }
            }
        }

        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                /** 服务实例下线*/
                internalCancel(appName, id, false);
            }
        }
    }

每次客户端发送心跳进行续约时,服务器会记录客户端的最新更新时间。同时Eureka服务器会启动定时任务默认每分钟执行一次,判断所有服务实例上一次更新时间到现在是否超过配置的最大空闲时间,如果超过就表示长时间没有续约则做下线处理,如果客户端

未超过最大空闲时间,那么无需做任何处理。

四、Eureka自我保护机制

Eureka服务端有一个自我保护机制,默认时开启的,可以通过配置 eureka.server.enable-self-preservation=false 关闭自我保护机制。

4.1、自我保护机制的作用

Eureka客户端向Eureka服务器注册服务之后,需要定时向Eureka服务器发送心跳进行服务续约,只有定时续约成功的服务实例Eureka服务器才会认为是可用的,如果Eureka客户端没有定时续约,那么Eureka服务器就会认为该Eureka客户端不可用,那么就会强制

从服务列表中剔除,从而保证Eureka服务器维护的服务都是可用的。

但是会存在这样的问题,当Eureka客户端都定时向服务器续约时,Eureka服务器本身由于网络原因导致没有接收到时,可能就会误认为Eureka客户端没有续约,从而将Eureka客户端强制下线,显然这样就需要Eureka服务器高可用和高稳定,一旦Eureka服务器网络

不稳定可能就会将正常的服务强制下线,从而使的整个系统无法运转。

于是Eureka就有了自我保护机制,一旦开启了自我保护机制,当Eureka接收到的心跳数量低于一定数量时,Eureka会优先认为是服务器本身的原因,从而将所有服务实例保护起来,无论服务实例是否发送心跳都不会将服务实例下线,这样就避免了Eureka服务器因为

自身原因而将正常的服务下线。Eureka自我保护机制原则是:宁可放过少数不可用服务,绝不错下线所有服务。

当Eureka服务器处于自我保护状态时,Eureka控制后台会显示如下红字警告:

4.2、自我保护机制的弊端

一旦自我保护机制触发之后,真的不可用的服务还会保留在可用服务列表中,此时服务消费者还是可以拉取到不可用服务实例列表,从而出现服务消费者无法正常消费服务的情况,此时就需要服务消费者采用熔断器来提供系统的容错能力。

所以Eureka的自我保护机制触发的规则就需要很平衡,既要保证有自我保护机制,又需要提供容错率,不能轻易触发也不可以很难触发。

4.3、自我保护机制的实现原理

自我保护机制和两个关键参数有关,分别是Renews threshold 和 Renews(last min) , 分别表示心跳阈值,Renews(last min)表示上一分钟接收到的心跳次数

当Renews(last min)小于Renews threshold时表示Eureka服务器接收到的心跳数量少于期待的阈值,那么就会触发自我保护机制,

Renews threshold 计算公式为:2 * n * 0.85,其中n为客户端数量,0.85为阈值比例,可以通过配置 eureka.server.renewal-percent-threshold 来进行配置

而Renews(last min)正常情况下应该等于 2 * n,因为默认情况下客户端需要30秒发送一次心跳,所以每分钟需要接收每个客户端两次心跳。

另外当Eureka服务器启动时,此时还没有客户端注册时,n值为默认值1,所以Renews threshold值为 2 * 0.85 = 1

当有一个Eureka客户端注册时,n值为2,Renews threshold值为 2 * 2 * 0.85 = 3

当有两个Eureka客户端注册时,n值为3,Renews threshold值为 3 * 2 * 0.85 = 5

当自我保护机制触发后,Lease expiration enabled就会变为false,表示不会强制下线服务实例,当自我保护机制解除后,Lease expiration enabled会变为true。

自我保护机制触发是在续约过期定时任务EvictionTask中触发,EvictionTask任务中会先执行isLeaseExpirationEnabled()方法判断是否开启,如果为false那么就不需要进行续约过期判断,方法逻辑如下:

/**
     * 判断是否开启续约过期处理,返回true则表示需要处理续约过期; 返回false则表示不需要续约过期处理
     * */
    public boolean isLeaseExpirationEnabled() {
        /** 1.判断服务器是否自我保护机制,没有开启则始终返回true表示需要处理续约过期*/
        if (!isSelfPreservationModeEnabled()) {
            return true;
        }
        /** 2.判断上一次分钟收到的续约心跳数 是否大于 续约心跳阈值,
         *  如果大于则表示续约正常,那么就解除自我保护机制;
         *  如果小于则表示续约不正常,那么就开启自我保护机制 */
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

五、注册中心比较

目前热门的注册中心除了Eureka,还有zookeeper、nacos、consul、redis等,不同注册中心各有优缺点,对比如下:

  Eureka zookeeper nacos consul redis
开源
集群部署 支持 支持 支持 支持 支持
通信协议 HTTP TCP HTTP HTTP TCP
监听更新 支持 支持 支持 支持 支持
健康检测 支持 支持 支持 支持 支持
负载均衡 支持 不支持 支持 支持 不支持
雪崩保护 支持 不支持 支持 不支持 不支持
一致性协议 AP CP CP+AP CP CP
dubbo集成 不支持 支持 支持 支持 支持
springcloud集成 支持 支持 支持 支持 不支持
k8s集成 不支持 不支持 支持 支持 支持

从功能支持来讲,Eureka和Nacos执行的功能较多,但是Eureka主要是配合Springcloud集成使用,而Nacos开放性较好,既能支持Springcloud也可以支持dubbo;

从稳定性来讲,各个注册中心都能保证一定的稳定性,都支持集群部署保证高可用性;

从分布式一致性来讲,Eureka支持AP,可以保证高可用性,但是不保证同一时间点的数据一致性,因为每一个节点都自行维护一份数据,节点之前数据同步通过异步调用HTTP接口的方式同步,所以可能会存在同一时间数据不一致的情况。

Nacos同时支持AP和CP,Nacos的服务信息管理可以采用数据库存储也可以采用内存存储,当采用数据库存储时可以保证数据一致,但是可能会出现短时间不可用,效果同ZK;当采用内存存储数据时,可以保证高可用但是可能短时间内数据不一致,效果同Eureka。

原文地址:https://www.cnblogs.com/jackion5/p/15737884.html