深入理解Eureka

深入理解Eureka - Eureka Client获取注册信息机


Eureka Client提供了定时获取注册信息的机制。Eureka Client获取注册信息的所有逻辑都在DiscoveryClient类里。

Eureka在初始化的时候根据获取注册信息的开关(默认开启)来决定是否初始化获取注册信息定时任务(默认30S同步一次):

  1. if(clientConfig.shouldFetchRegistry()){
  2. // registry cache refresh timer
  3. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
  4. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
  5. scheduler.schedule(
  6. newTimedSupervisorTask(
  7. "cacheRefresh",
  8. scheduler,
  9. cacheRefreshExecutor,
  10. registryFetchIntervalSeconds,
  11. TimeUnit.SECONDS,
  12. expBackOffBound,
  13. newCacheRefreshThread()
  14. ),
  15. registryFetchIntervalSeconds,TimeUnit.SECONDS);
  16. }

Eureka Client提供了两种获取注册信息的模式

  1. 全量获取
  2. 增量获取

Eureka Client全量获取注册信息

  1. /**
  2. * Fetches the registry information.
  3. *
  4. * <p>
  5. * This method tries to get only deltas after the first fetch unless there
  6. * is an issue in reconciling eureka server and client registry information.
  7. * </p>
  8. *
  9. * @param forceFullRegistryFetch Forces a full registry fetch.
  10. *
  11. * @return true if the registry was fetched
  12. */
  13. privateboolean fetchRegistry(boolean forceFullRegistryFetch){
  14. Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  15.  
  16. try{
  17. // If the delta is disabled or if it is the first time, get all
  18. // applications
  19. Applications applications = getApplications();
  20.  
  21. if(clientConfig.shouldDisableDelta()
  22. ||(!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
  23. || forceFullRegistryFetch
  24. ||(applications ==null)
  25. ||(applications.getRegisteredApplications().size()==0)
  26. ||(applications.getVersion()==-1))//Client application does not have latest library supporting delta
  27. {
  28. logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
  29. logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
  30. logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
  31. logger.info("Application is null : {}",(applications ==null));
  32. logger.info("Registered Applications size is zero : {}",
  33. (applications.getRegisteredApplications().size()==0));
  34. logger.info("Application version is -1: {}",(applications.getVersion()==-1));
  35. getAndStoreFullRegistry();
  36. }else{
  37. getAndUpdateDelta(applications);
  38. }
  39. applications.setAppsHashCode(applications.getReconcileHashCode());
  40. logTotalInstances();
  41. }catch(Throwable e){
  42. logger.error(PREFIX +"{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
  43. returnfalse;
  44. }finally{
  45. if(tracer !=null){
  46. tracer.stop();
  47. }
  48. }
  49.  
  50. // Notify about cache refresh before updating the instance remote status
  51. onCacheRefreshed();
  52.  
  53. // Update remote status based on refreshed data held in the cache
  54. updateInstanceRemoteStatus();
  55.  
  56. // registry was fetched successfully, so return true
  57. returntrue;
  58. }

全量获取的条件

1、配置禁用了增量获取

2、首次请求(当前applications为空 | 当前获取到的application为0)

3、如果增量同步失败,还会强制全量同步

全量同步流程

全量同步时,通过访问Eureka Server的接口来获取全部服务信息

  1. /**
  2. * Gets the full registry information from the eureka server and stores it locally.
  3. * When applying the full registry, the following flow is observed:
  4. *
  5. * if (update generation have not advanced (due to another thread))
  6. * atomically set the registry to the new registry
  7. * fi
  8. *
  9. * @return the full registry information.
  10. * @throws Throwable
  11. * on error.
  12. */
  13. privatevoid getAndStoreFullRegistry()throwsThrowable{
  14. long currentUpdateGeneration = fetchRegistryGeneration.get();
  15.  
  16. logger.info("Getting all instance registry info from the eureka server");
  17.  
  18. Applications apps =null;
  19. EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress()==null
  20. ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
  21. : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
  22. if(httpResponse.getStatusCode()==Status.OK.getStatusCode()){
  23. apps = httpResponse.getEntity();
  24. }
  25. logger.info("The response status is {}", httpResponse.getStatusCode());
  26.  
  27. if(apps ==null){
  28. logger.error("The application is null for some reason. Not storing this information");
  29. }elseif(fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration +1)){
  30. localRegionApps.set(this.filterAndShuffle(apps));
  31. logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
  32. }else{
  33. logger.warn("Not updating applications as another thread is updating it already");
  34. }
  35. }

Eureka Server的com.netflix.eureka.resources.ApplicationsResource#Containers接收到请求后将服务信息返回。

  1. /**
  2. * Get information about all {@link com.netflix.discovery.shared.Applications}.
  3. *
  4. * @param version the version of the request.
  5. * @param acceptHeader the accept header to indicate whether to serve JSON or XML data.
  6. * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
  7. * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
  8. * @param uriInfo the {@link java.net.URI} information of the request made.
  9. * @param regionsStr A comma separated list of remote regions from which the instances will also be returned.
  10. * The applications returned from the remote region can be limited to the applications
  11. * returned by {@link EurekaServerConfig#getRemoteRegionAppWhitelist(String)}
  12. *
  13. * @return a response containing information about all {@link com.netflix.discovery.shared.Applications}
  14. * from the {@link AbstractInstanceRegistry}.
  15. */
  16. @GET
  17. publicResponse getContainers(@PathParam("version")String version,
  18. @HeaderParam(HEADER_ACCEPT)String acceptHeader,
  19. @HeaderParam(HEADER_ACCEPT_ENCODING)String acceptEncoding,
  20. @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)String eurekaAccept,
  21. @ContextUriInfo uriInfo,
  22. @Nullable@QueryParam("regions")String regionsStr){
  23.  
  24. boolean isRemoteRegionRequested =null!= regionsStr &&!regionsStr.isEmpty();
  25. String[] regions =null;
  26. if(!isRemoteRegionRequested){
  27. EurekaMonitors.GET_ALL.increment();
  28. }else{
  29. regions = regionsStr.toLowerCase().split(",");
  30. Arrays.sort(regions);// So we don't have different caches for same regions queried in different order.
  31. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
  32. }
  33.  
  34. // Check if the server allows the access to the registry. The server can
  35. // restrict access if it is not
  36. // ready to serve traffic depending on various reasons.
  37. if(!registry.shouldAllowAccess(isRemoteRegionRequested)){
  38. returnResponse.status(Status.FORBIDDEN).build();
  39. }
  40. CurrentRequestVersion.set(Version.toEnum(version));
  41. KeyType keyType =Key.KeyType.JSON;
  42. String returnMediaType =MediaType.APPLICATION_JSON;
  43. if(acceptHeader ==null||!acceptHeader.contains(HEADER_JSON_VALUE)){
  44. keyType =Key.KeyType.XML;
  45. returnMediaType =MediaType.APPLICATION_XML;
  46. }
  47.  
  48. Key cacheKey =newKey(Key.EntityType.Application,
  49. ResponseCacheImpl.ALL_APPS,
  50. keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
  51. );
  52.  
  53. Response response;
  54. if(acceptEncoding !=null&& acceptEncoding.contains(HEADER_GZIP_VALUE)){
  55. response =Response.ok(responseCache.getGZIP(cacheKey))
  56. .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
  57. .header(HEADER_CONTENT_TYPE, returnMediaType)
  58. .build();
  59. }else{
  60. response =Response.ok(responseCache.get(cacheKey))
  61. .build();
  62. }
  63. return response;
  64. }

Eureka Client增量获取注册信息

增量获取条件

只有开启了增量获取的开关,且非首次获取时,才进行增量的获取。

增量获取流程

增量获取时,通过访问Eureka Server的增量接口,来获取增量的服务信息(注意增量同步失败后会全量同步)

  1. /**
  2. * Get the delta registry information from the eureka server and update it locally.
  3. * When applying the delta, the following flow is observed:
  4. *
  5. * if (update generation have not advanced (due to another thread))
  6. * atomically try to: update application with the delta and get reconcileHashCode
  7. * abort entire processing otherwise
  8. * do reconciliation if reconcileHashCode clash
  9. * fi
  10. *
  11. * @return the client response
  12. * @throws Throwable on error
  13. */
  14. privatevoid getAndUpdateDelta(Applications applications)throwsThrowable{
  15. long currentUpdateGeneration = fetchRegistryGeneration.get();
  16.  
  17. Applications delta =null;
  18. EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  19. if(httpResponse.getStatusCode()==Status.OK.getStatusCode()){
  20. delta = httpResponse.getEntity();
  21. }
  22.  
  23. if(delta ==null){
  24. logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
  25. +"Hence got the full registry.");
  26. getAndStoreFullRegistry();
  27. }elseif(fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration +1)){
  28. logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
  29. String reconcileHashCode ="";
  30. if(fetchRegistryUpdateLock.tryLock()){
  31. try{
  32. updateDelta(delta);
  33. reconcileHashCode = getReconcileHashCode(applications);
  34. }finally{
  35. fetchRegistryUpdateLock.unlock();
  36. }
  37. }else{
  38. logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
  39. }
  40. // There is a diff in number of instances for some reason
  41. if(!reconcileHashCode.equals(delta.getAppsHashCode())|| clientConfig.shouldLogDeltaDiff()){
  42. reconcileAndLogDifference(delta, reconcileHashCode);// this makes a remoteCall
  43. }
  44. }else{
  45. logger.warn("Not updating application delta as another thread is updating it already");
  46. logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
  47. }
  48. }

然后Eureka Server的接口接收到增量请求后,将增量服务返回给Eureka Client

  1. /**
  2. * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
  3. *
  4. * <p>
  5. * The delta changes represent the registry information change for a period
  6. * as configured by
  7. * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
  8. * changes that can happen in a registry include
  9. * <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
  10. * the changes to the registry are infrequent and hence getting just the
  11. * delta will be much more efficient than getting the complete registry.
  12. * </p>
  13. *
  14. * <p>
  15. * Since the delta information is cached over a period of time, the requests
  16. * may return the same data multiple times within the window configured by
  17. * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
  18. * are expected to handle this duplicate information.
  19. * <p>
  20. *
  21. * @param version the version of the request.
  22. * @param acceptHeader the accept header to indicate whether to serve JSON or XML data.
  23. * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
  24. * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
  25. * @param uriInfo the {@link java.net.URI} information of the request made.
  26. * @return response containing the delta information of the
  27. * {@link AbstractInstanceRegistry}.
  28. */
  29. @Path("delta")
  30. @GET
  31. publicResponse getContainerDifferential(
  32. @PathParam("version")String version,
  33. @HeaderParam(HEADER_ACCEPT)String acceptHeader,
  34. @HeaderParam(HEADER_ACCEPT_ENCODING)String acceptEncoding,
  35. @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)String eurekaAccept,
  36. @ContextUriInfo uriInfo,@Nullable@QueryParam("regions")String regionsStr){
  37.  
  38. boolean isRemoteRegionRequested =null!= regionsStr &&!regionsStr.isEmpty();
  39.  
  40. // If the delta flag is disabled in discovery or if the lease expiration
  41. // has been disabled, redirect clients to get all instances
  42. if((serverConfig.shouldDisableDelta())||(!registry.shouldAllowAccess(isRemoteRegionRequested))){
  43. returnResponse.status(Status.FORBIDDEN).build();
  44. }
  45.  
  46. String[] regions =null;
  47. if(!isRemoteRegionRequested){
  48. EurekaMonitors.GET_ALL_DELTA.increment();
  49. }else{
  50. regions = regionsStr.toLowerCase().split(",");
  51. Arrays.sort(regions);// So we don't have different caches for same regions queried in different order.
  52. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
  53. }
  54.  
  55. CurrentRequestVersion.set(Version.toEnum(version));
  56. KeyType keyType =Key.KeyType.JSON;
  57. String returnMediaType =MediaType.APPLICATION_JSON;
  58. if(acceptHeader ==null||!acceptHeader.contains(HEADER_JSON_VALUE)){
  59. keyType =Key.KeyType.XML;
  60. returnMediaType =MediaType.APPLICATION_XML;
  61. }
  62.  
  63. Key cacheKey =newKey(Key.EntityType.Application,
  64. ResponseCacheImpl.ALL_APPS_DELTA,
  65. keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept), regions
  66. );
  67.  
  68. if(acceptEncoding !=null
  69. && acceptEncoding.contains(HEADER_GZIP_VALUE)){
  70. returnResponse.ok(responseCache.getGZIP(cacheKey))
  71. .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
  72. .header(HEADER_CONTENT_TYPE, returnMediaType)
  73. .build();
  74. }else{
  75. returnResponse.ok(responseCache.get(cacheKey))
  76. .build();
  77. }
  78. }

备注:Register、renew、cancel、evict最终都会更新recnetlyChangedQueue队列,Eureka Client获取的增量信息都是从这个队列中获取的。

Spring Cloud实战项目Jbone地址

github地址:https://github.com/417511458/jbone

码云地址:https://gitee.com/majunwei2017/jbone

原创文章,转载请注明出处:转载自小马过河 - 深入理解Eureka - Eureka Client获取注册信息机制

 

Jbone

Spring Cloud实战项目jbone正在开发中, jbone功能包括服务管理、单点登录、系统管理平台、内容管理平台、电商平台、支付平台、工作流平台等子系统。欢迎关注!

GitHub 码云
收藏文章
 
 
 
 
 
 
 
  •  
 
 
 
表情删除后不可恢复,是否删除
取消
确定
 
 
图片正在上传,请稍后...
 
 
评论内容为空!
还没有评论,快来抢沙发吧!
 
 
 
 

畅言正式推出商业版包月/包年VIP服务,评论流弹窗位置全站去广告,一对一专业技术支持,一对一重点客户服务,现在就加入我们的VIP大家庭吧!

 
原文地址:https://www.cnblogs.com/tiancai/p/9598485.html