Eureka系列(八)服务剔除具体实现

服务下线的大致流程图

  下面这张图很简单地描述了服务剔除的大致流程:服务剔除.jpg

服务剔除实现源码分析

  首先我们得了解下服务剔除这个定时任务是什么被初始化启动的,在百度搜索中,在我们Eureka Server端启用的时执行的EurekaBootStrap类中initEurekaServerContext方法找到了服务剔除任务的初始化。接下来我们就看一看源码:

protected void initEurekaServerContext() throws Exception {
        ...省略其他代码
        registry.openForTraffic(applicationInfoManager, registryCount);
        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

  在initEurekaServerContext()方法中, registry.openForTraffic(applicationInfoManager, registryCount)这个方法来初始化我们的服务剔除任务。我们看源码验证下:

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
   super.openForTraffic(applicationInfoManager,
         count == 0 ? this.defaultOpenForTrafficCount : count);
}
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

  在openForTraffic方法中最后我们看到调用了父类postInit()方法,我们接着看postInit这个方法:

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

  由上面可见,Eureka通过evictionTimer.schedule初始化了一个定时60s的定时任务。
  接下来我们来看看EvictionTask这个类的具体实现EvictionTask这个类实现了服务剔除的具体操作。

@Override
public void run() {
    try {
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {
        logger.error("Could not run the evict task", e);
    }
}

  我们接着看evict()方法的实现:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }
    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    // 先收集过期的实例信息,然后再剔除掉
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。没有它,我们就会把整个注册表都抹掉。
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

  由此可见,evict()方法最终实现了服务的剔除。

(color{red}{注意:})
  (color{red}{Eureka的服务剔除会因为Eureka的自我保护机制而受到影响,导致不会剔除掉已经认为下线的服务}),这一点,会在下一节中做下解Eureka自我保护机制的讲解。

   不知道有没有小伙伴对Eureka是如何判断这个实例是否不可用呢,有很大的疑惑呢?我们接下来去看一看lease.isExpired(additionalLeaseMs)这个方法,这个方法就是拿来判断实例是否可用。


    /**
     * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
     *
     * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
     * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
     * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
     * not be fixed.
     *
     * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
     */
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

  右上可见,我们可以发现Eureka是通过lastUpdateTimestamp这个上次更新时间来判断我们的服务是否可用,不知道小伙伴对服务续约哪里有影响,每当我们Client调用一次Server端服务续约接口时,Server端就会更新下服务的lastUpdateTimestamp。我们来回一下服务续约更新上次更新时间的方法,更新lastUpdateTimestamp代码如下:

   /**
    * Renew the lease, use renewal duration if it was specified by the
    * associated {@link T} during registration, otherwise default duration is
    * {@link #DEFAULT_DURATION_IN_SECS}.
    */
   public void renew() {
       lastUpdateTimestamp = System.currentTimeMillis() + duration;

   }

   不知道小伙伴有没有注意一个事情,在isExpired这个方法的注释里,好像有一个很大的“彩蛋”,注释如下:Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will not be fixed. 翻译过来就是:注意,由于renew()做了“错误”的事情,并将lastUpdateTimestamp设置为+duration,超过了它应该的值,因此到期实际上是2 * duration。这是一个小错误,应该只影响那些不正常关闭的实例。由于可能对现有的使用产生广泛的影响,这个问题将不会得到解决。
   简单来说,就是在服务续约执行renew()方法时,不应该加上duration这个值,但是呢,因为这个问题只会出现在检测不正常关闭的服务才会有影响,Eureka 官方怕其他正在运行的服务有影响,就没有修正这个小error。
  看到这儿,小伙伴是不是觉得,eureka的RD也是很神奇,明明知道这是一个bug,但是却不改(其实人家也想改,但是怕一改影响了其他的正常使用,然后考虑这个bug对Eureka正常使用没有太大影响,也就没有去修正了,但是人家RD还是很贴心的,在注释中还是说明这个问题,以及为什么不修正的原因)。


题外

  可能有小伙伴会问,我们有服务下线接口,为什么还需要EurekaServer服务端自己启用一个服务剔除任务呢?
  其实很简单,因为如果我们是直接强制性停止任务,例如机器停电之类的,肯定Client就不会去调用服务下线接口,来通知Server端自己下线。其次如果我们Client正常停止,在调用服务下线接口中,发现网络出现问题,没法调用Server提供的接口,那样也没法让Server知道自己这个服务下线了。所以Server端需要自己启动一个服务剔除任务,来剔除掉哪些已经down掉的服务。(该观点为博主自己的主观观点,小伙伴也可以自行思考

原文地址:https://www.cnblogs.com/liujunj/p/13401812.html