SpringCloud--服务治理Eureka

1.前序
     服务治理是微服务架构中最为核心和基础的模块,主要用来实现各个微服务实例的自动化注册与发现。它旨在解决业务增长,系统功能越来越复杂,微服务应用不断增多的情况下,解决我们手动维护的问题。
Eureka服务治理体系主要有三个核心角色:服务注册中心,服务提供者以及服务消费者。
2.服务治理Eureka的基础架构

  • 服务注册中心:Eureka提供的服务端,提供服务注册与发现的功能
  • 服务提供者:提供服务的应用,可是是SpringBoot应用,也可以是其他技术平台且遵循Eureka通信机制的应用。它将自己提供的服务注册到Eureka,以供其他应用发现和使用。
  • 服务消费者:消费者应用从服务注册中心获取服务列表,从而使消费者可以知道去何处调用其所需的服务。

       

3.服务治理的机制


 服务注册中心


 (1)失效剔除:
   失效剔除的主要作用是剔除集群中不能提供服务的实例。服务实例可能由于内存溢出,网络故障灯原因使得服务不能正常工作,服务中心也未收到”服务下线“的相关请求,这时候针对这些服务,Eureka Server在启动的时候会创建一个定时任务,默认每隔一段时间将当前清单中超时没有续约的服务剔除。
参数:eureka.instance.leaseExpirationDurationInSeconds
注释:Eureka 服务端在收到最后一次心跳之后等待时间上限,单位为秒,超过该时间之后服务端会将该服务实例从服务清单中剔除,从而进制服务调用请求被发送到该实例上。

 (2)自我保护
   Eureka Server 在运行期间会去统计心跳失败比例在 15 分钟之内是否低于 85%(renewalPercentThreshold:默认值是0.85),如果低于 85%,Eureka Server 会将这些实例保护起来,让这些实例不会过期,但是在保护期内如果服务刚好这个服务提供者非正常下线了,此时服务消费者就会拿到一个无效的服务实例,此时会调用失败,对于这个问题需要服务消费者端要有一些容错机制,如重试,断路器等。

   关闭自我保护进制的参数:eureka.server.enable-self-preservation=false
   Renews threshold:Eureka Server 期望每分钟收到客户端实例续约的总数。
   Renews (last min):Eureka Server 最后 1 分钟收到客户端实例续约的总数。
   自我保护模式被激活的条件是:在 1 分钟后,Renews (last min) < Renews threshold。
   eureka.server.renewal-percent-threshold:可以设置renewalPercentThreshold的值,默认值为0.85
   protected volatile int numberOfRenewsPerMinThreshold; 期望最小每分钟续租次数。
   protected volatile int expectedNumberOfClientsSendingRenews;期望最大每分钟续租次数。
 触发条件:
   当每次心跳次数(renewLastMin)小于numberOfRenewsPerMinThreshold时,并且开启自动保护模式开关是打开,就会触发自动保护机制,不再自动过期续约。

 1 #   PeerAwareInstanceRegistryImpl.java
 2 
 3 @Override
 4     public boolean isLeaseExpirationEnabled() {
 5         if (!isSelfPreservationModeEnabled()) {
 6             // The self preservation mode is disabled, hence allowing the instances to expire.
 7             return true;
 8         }
 9         return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
10     }

服务提供者

(1)服务注册
“服务提供者”在启动的时候会通过发送REST请求的方式将自己注册到Eureka Server上,同时会带上自身服务的一些元数据信息。Eureka Server在接收到REST请求后,将元数据存储在一个双层的CurrentHashMap中,第一层的key是服务名,第二层的key是具体服务的实例名。
参数:eureka.cli ent.register-with-eureka = true;才会启动注册操作。

服务注册流程源码如下:

 /**
     * 初始化所有的定时任务
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
/**
*服务同步的定时任务,调用DiscoveryClient的
refreshRegistry方法
*/
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // 服务续约
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            //InstanceInfoReplicator的run方法中会调用discoveryClient.register();进行服务注册
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
InstanceInfoReplicator的服务注册相关的代码如下:
public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
               //此方法进行服务注册调用
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

DiscoveryClient中真正实现服务注册的源码如下:
 /**
     * 通过REST请求的方式进行服务注册
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

(2)服务同步
     两个服务提供者分别注册到了两个不同的服务注册中心上,但是它们的信息分别被两个服务注册中心所维护,当服务中心采用的是集群方案的时候,当服务提供者发送注册请求到一个服务注册中心的时候,它会将该请求转发给集群中相连接的其他的注册中心,从而实现注册中心之间的服务同步。
通过服务同步,两个服务提供者的服务信息可以在集群中任意一个注册中心上获取到。

 服务同步入口的源码:
/**
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }

            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

         ............
    }

(3)服务续约
当服务注册完之后,会启动一个定时任务维护一个心跳来持续告诉Eureka Server,我还处于存活状态,以防止被Eureka Server 剔除。
两个重要参数:
eureka.instance.lease-renewal-interval-in-seconds=30 :用于定义服务续约任务的调用间隔时间 默认是30秒
eureka.instance.lease-expiration-duration-in-seconds=90 :用于定义服务失效的时间,默认为90秒。

服务续约源码入口:

 /**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }


/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

服务消费者

(1)获取服务
在我们启动服务消费者的实例之后,会发送一个REST请求到服务注册中心,来获取服务注册中心上面已经注册的服务清单,出于性能上的考虑,Eureka Server会维护一份只读的服务清单,同时缓存该清单的时间间隔默认为30s刷新一次。
注意:第一点,服务消费者必须配置保eureka.c巨ent.fetch-registry=true,这是服务消费者获取服务列表的基础。第二点:服务列表缓存刷新时间可以通过该参数修改eureka.c巨ent.registry-fetch-interval-seconds=30,默认是30秒

(2)服务调用
服务消费者在获取到服务清单之后,通过服务名可以获取到具体提供的服务的实例名和该实例的元数据信息(InstanceInfo),客户端会根据自己的情况决定选择调用哪个实例。
在这块有两个概念需要理解下,Region和Zone
//TODO

(3)服务下线
服务实例在进行正常的关闭操作的时候,它会触发一个服务下线的REST请求给Eureka Servcer ,告诉服务注册中心,服务端在接收到请求之后,就会将该服务的状态设置为DOWN,并且把这个下线事件传播出去。

 
原文地址:https://www.cnblogs.com/lovegrace/p/10338442.html