Eureka 系列(08)心跳续约与自动过期

Eureka 系列(08)心跳续约与自动过期

Spring Cloud 系列目录 - Eureka 篇

在上一篇 Eureka 系列(07)服务注册与主动下线 中对服务的注册与下线进行了分析,本文继续分析 Eureka 是如何进行心跳续约的。

1. 心跳续约

心跳续约有两种情况:一是客户端发起的心跳续约(isReplication=false);二是服务器消息广播时发起的心跳续约(isReplication=true)。这两种心跳续约的处理稍有不同。

1.1 心跳续约机制

当服务器收到客户端的心跳续约后,首先在当着服务器上更新租约时间,如果成功,则将心跳广播给其它服务器。

图1:Eureka 心跳续约机制
sequenceDiagram participant InstanceResource participant PeerAwareInstanceRegistryImpl participant AbstractInstanceRegistry participant PeerEurekaNode note over InstanceResource: PUT:/euraka/apps/{appName}/{id}<br/>renewLease InstanceResource ->> PeerAwareInstanceRegistryImpl : 心跳请求:renew(appName,id,isReplication) PeerAwareInstanceRegistryImpl ->> AbstractInstanceRegistry : 1. 本地数据更新:renew(appName,id,isReplication) loop 同步到其它 Eureka Server 节点 PeerAwareInstanceRegistryImpl ->> PeerAwareInstanceRegistryImpl : 2.1 数据同步:replicateToPeers PeerAwareInstanceRegistryImpl ->> PeerEurekaNode : 2.2 heartbeat -> PUT:/euraka/apps/{appName}/{id} alt 3.1 failure: 404则更新对方节点 PeerEurekaNode -->> PeerEurekaNode : register(info) else 3.2 failure: 否则更新自己节点 PeerEurekaNode -->> PeerEurekaNode : syncInstancesIfTimestampDiffers PeerEurekaNode -->> PeerEurekaNode : register(infoFromPeer, true) end end

总结:

  1. renewLease 心跳续约请求是 InstanceResource#renewLease 方法进行处理。isReplication=false 则是客户端请求,true 则是消息广播请求。
  2. renew 本地服务器心跳处理。处理成功则进行心跳消息广播。
  3. heartbeat 心跳消息广播给其它服务器。需要注意心跳广播失败的处理机制:
    • 如果对方服务器不存在该实例或 PK 失败,需要重新注册更新对方服务的实例信息。
    • 如果对方服务器 PK 成功,则需要反过来更新本地服务的注册信息。

1.2 接收心跳续约 - renewLease

InstanceResource#renewLease 处理心跳续约请求,路径是 PUT /apps/{appName}/{id}

  1. 如果本地服务端处理失败(包括实例不存在或实例的状态是UNKNOWN),就返回 NOT_FOUND,也是需要重新注册,更新实例信息。
  2. 服务端和客户端实例的 lastDirtyTimestamp 进行 PK。结果两种情况:一是服务端实例 PK 失败,返回 NOT_FOUND,客户端重新注册,从而更新服务端实例信息;二是服务端实例 PK 成功,返回实例信息给客户端,从而更新客户端实例信息。
@PUT
public Response renewLease(
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    @QueryParam("overriddenstatus") String overriddenStatus,
    @QueryParam("status") String status,
    @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 1. 心跳处理,本地心跳处理成功后进行消息广播。
    //    由于消息广播是异步的,实际返回的结果是本地心跳处理的结果。
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // 2. 心跳处理失败分两种情况:一是本地服务器不存在该服务实例;
    //    二是本地服务实例和lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
    if (!isSuccess) {
        return Response.status(Status.NOT_FOUND).build();
    }
    
    // 3. 本地服务实例和请求的lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
    //    后面有时间专门介绍一下 OverriddenStatus
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
            && (overriddenStatus != null)
            && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
            && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    return response;
}

总结: 下面分三部分说明心跳续约的整个流程:

  1. 本地服务器是如何处理续约的?主要是 AbstractInstanceRegistry#renew 方法。
  2. 本地服务器和客户端实例的 lastDirtyTimestamp 如何进行 PK ?主要是 InstanceResource#validateDirtyTimestamp 方法。
  3. Eureka Client 是如何发起心跳续约请求,并处理请求结果?主要是 DiscoveryClient。
  4. 心跳续约消息广播如何处理?主要是 PeerEurekaNode#heartbeat 方法。

1.3 本地续约处理 - renew

本地服务端续约,如果实例不存在或实例状态是 UNKNOWN 时返回 false,表示需要客户端重新注册,更新服务端实例信息。当然返回 true 时,也不意味着数据是最新的,需要在下一步继续校验脏数据。

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 1. 获取服务端注册的实例
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // 2.1 服务实例不存在,返回404
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        return false;
    // 2.2 服务实例存在,
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 实例的状态是 UNKNOWN 时返回 false,否则返回 true
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        renewsLastMin.increment();
        // 3. 更新最后一次的心跳时间(核心)
        leaseToRenew.renew();
        return true;
    }
}

总结: 如果实例存在且状态不是 UNKNOWN 时就需要在下一步继续校验脏数据。其中最核心的一名代码就是 leaseToRenew.renew() 更新最后一次的心跳时间,Eureka 的租约管理都是在 Lease 完成的。

1.4 脏数据校验 - validateDirtyTimestamp

validateDirtyTimestamp 方法主要是将客户端实例和服务端本地实例进行 PK。PK 的原则就是:服务实例 lastDirtyTimestamp 大的代表是最新的注册信息。 其实原因也很简单,每次服务实例更新时都会更新时间戳,这样时间戳大的就代表最后更新的实例,其它服务节点的实例信息都要这个服务实例进行同步。

private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                        boolean isReplication) {
    // 1. 获取本地注册的实例,和客户端的实例进行 PK
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        // 2. 客户端和服务端的实例更新的时间戳发生了变化,说明实例信息不同步了,进行PK
        if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
			// 3.1 客户端 PK 成功,客户端需要重新将实例注册一次,更新服务端的实例信息
            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                return Response.status(Status.NOT_FOUND).build();
			// 3.2 服务端 PK 成功,将实例信息返回给客户端,更新客户端的实例信息
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                // ture表示Eureka内部之间同步数据,需要更新实例信息
                // 集群内部数据要一致,肯定要同步数据
                if (isReplication) {
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                // false表示EurekaClient的心跳,不需要同步实例信息给EurekaClient?
                } else {
                    return Response.ok().build();
                }
            }
        }
    }
    return Response.ok().build();
}

总结: 就一句话,lastDirtyTimestamp 大代表是最新的注册信息。

注意: 集群内部消息广播和 EurekaClient 心跳续约的处理不一样(3.2):

  • 集群内部消息广播:如果数据不一致,肯定要进行数据同步处理,达到最终一致性。
  • EurekaClient 心跳续约,如果服务端是最新的数据,不需要同步给客户端。

1.5 客户端处理 - renew

EurekaClient 心跳续约时,如果客户端的实例信息是最新的,需要发起重新注册,更新服务端的实例信息,但服务端的实例信息是最新的,不会更新客户端的实例信息。

// DiscoveryClient
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        // 404时重新发起注册,更新服务端的实例信息
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        return false;
    }
}

1.6 心跳广播 - heartbeat

心跳广播重点需要关注失败时的处理逻辑:一是返回 404,也就是客户端的实例信息是最新的,重新发起注册,更新服务端的实例信息;二是其它异常,则需要根据服务端返回的实例更新客户端的注册信息。其中第二点是和 EurekaClient 心跳续约不同的地方。

public void heartbeat(final String appName, final String id,
                      final InstanceInfo info, final InstanceStatus overriddenStatus,
                      boolean primeConnection) throws Throwable {
    // 1. primeConnection时不关心心跳续约的结果,发送请求后直接返回
    if (primeConnection) {
        replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        return;
    }
    // 2. 关注请求结果,A -> B 发送心跳,成功就不说了
    // 3. 心跳续约失败有两种情况:一是 B 节点不存在该实例或 PK 失败,A -> B 重新发起注册请求;
    //    二是 B 节点存在该实例且 PK 成功,则反过来需要更新 A 节点该实例的注册信息。
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            super.handleFailure(statusCode, responseEntity);
            // 一是 B 节点不存在该实例,A -> B 重新发起注册请求
            if (statusCode == 404) {
                if (info != null) {
                    register(info);
                }
    		// 二是 B 节点存在该实例且 PK 赢了,则反过来需要更新 A 节点该实例的注册信息
            } else if (config.shouldSyncWhenTimestampDiffers()) {
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if (peerInstanceInfo != null) {
                    syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

总结: 心跳广播是保证 Eureka 数据最终一致性的重要一环,只要集群内部一直发送心跳广播,如果数据出现不一致的情况就会进行数据同步,从而保证数据的最终一致性。

// 更新本地实例注册信息
private void syncInstancesIfTimestampDiffers(
    String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
    try {
        if (infoFromPeer != null) {
            // 1. 更新overriddenStatus状态
            if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
            }
            // 2. 更新本地实例注册信息
            registry.register(infoFromPeer, true);
        }
    } catch (Throwable e) {
    }
}

2. 自动过期

还记得在 Eureka 系列(03)Spring Cloud 自动装配原理 中分析EurekaServerBootstrap 启动时会调用 registry.openForTraffic() 方法启动自动过期的定时任务 EvictionTask 吗?本文就从 EvictionTask 开始分析起。

2.1 启动 EvictionTask 定时任务

图2:启动自动过期定时任务
graph LR EurekaServerBootstrap -- openForTraffic --> PeerAwareInstanceRegistryImpl PeerAwareInstanceRegistryImpl -- postInit --> AbstractInstanceRegistry AbstractInstanceRegistry -- start --> EvictionTask
// 启动自动过期定时任务 EvictionTask,默认每 60s 执行一次
protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
                           serverConfig.getEvictionIntervalTimerInMs(),
                           serverConfig.getEvictionIntervalTimerInMs());
}

总结: 默认 EvictionTask 每 60s 执行一次,客户端每 30s 进行一次心跳续约,如果心跳续约超过 90s 则下线。

2.2 EvictionTask执行原理

图2:EvictionTask执行原理
sequenceDiagram participant EvictionTask participant AbstractInstanceRegistry participant Lease note left of EvictionTask : 60s定时任务 EvictionTask ->> AbstractInstanceRegistry : evict loop 自动过期 AbstractInstanceRegistry ->> Lease : isExpired AbstractInstanceRegistry ->> AbstractInstanceRegistry : internalCancel end

2.2.1 如何判断是否过期

首先对 Lease 几个重要属性进行说明:

private long evictionTimestamp;		// 服务下线时间
private long registrationTimestamp;	// 服务注册时间
private long serviceUpTimestamp;	// 服务UP时间
private volatile long lastUpdateTimestamp;	// 最后一次心跳续约时间
private long duration;				// 心跳过期时间,默认 90s

Lease 每次心跳续约时都会更新最后一次续约时间 lastUpdateTimestamp。如果服务下线则会更新下线时间 evictionTimestamp,这样 evictionTimestamp > 0 就表示服务已经下线了。默认心跳续约时间超过 90s 服务就自动过期。

public boolean isExpired(long additionalLeaseMs) {
    return (evictionTimestamp > 0 || System.currentTimeMillis() > 
            (lastUpdateTimestamp + duration + additionalLeaseMs));
}

总结: additionalLeaseMs 是一种补偿机制,可以当成默认值 0ms。

2.2.2 服务下线

服务下线时首先判断是否开启了自我保护机制,再计算出一次最多下线的实例个数,最后调用 internalCancel 将实例下线。

public void evict(long additionalLeaseMs) {
	// 1. 是否开启自我保护机制
    if (!isLeaseExpirationEnabled()) {
        return;
    }

    // 2. 调用 lease.isExpired 筛选出所有过期的实例
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

   	// 3. 计算一次最多下线的实例个数 toEvict
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            // 4. 和自动下线一样,调用internalCancel进行下线
            internalCancel(appName, id, false);
        }
    }
}

总结: 自动过期和主动下线的区别是自动过期会考虑服务的自我保护,计算一次最多下线的实例个数,其余的都一样。


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/11621403.html