Eureka 服务端端源码分析

Eureka服务端使用的使用,会引入spring-cloud-starter-rereka-server, 在Application类中引入注解@EnableEurekaServer。

1、@EnableEurekaServer。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}


@Configuration( proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
    public EurekaServerMarkerConfiguration() {
    }

    @Bean
    public EurekaServerMarkerConfiguration.Marker eurekaServerMarkerBean() {
        return new EurekaServerMarkerConfiguration.Marker();
    }

    class Marker {
        Marker() {
        }
    }
}

 看来找不到什么 。

然后进入spring-cloud-netflix-eureka-server-2.2.2.RELEASE的spring.factories查找

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

EnableAutoConfiguration对应的值列表中的类会在SpringBoot项目启动的时候注册到Spring容器中,那么EurekaServerAutoConfiguration会被默认加载到Spring中

2、EurekaServerAutoConfiguration

@ConditionalOnBean({Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    

 

    @Configuration( proxyBeanMethods = false )
    protected static class EurekaServerConfigBeanConfiguration {

        @Bean
        @ConditionalOnMissingBean
	//1、创建并加载EurekaServerConfig的实现类。主要是Eureka-server的配置信息
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) {
                server.setRegistrySyncRetries(5);
            }

            return server;
        }
    }

    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard",name = {"enabled"}, matchIfMissing = true)
    // 2.Eureka-server的可视化界面就是通过EurekaController提供的
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }

	
    @Bean
    // 3.接收客户端的注册等请求就是通过InstanceRegistry来处理的
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications();
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    // 4.初始化Eureka-server,会同步其他注册中心的数据到当前注册中心
    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
    }

}

  总结:通过以上分析可知,EurekaServer在启动的时候,会加载很多bean到Spring容器中,每个bean都实现了各自的功能,我们分析最重要的一个bean,也就是真正处理客户端请求的类InstanceRegistry.java,

3、org.springframework.cloud.netflix.eureka.server.InstanceRegistry 

// 1.接收客户端注册请求
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
        this.handleRegistration(info, leaseDuration, isReplication);
        super.register(info, leaseDuration, isReplication); //// 在1)中详细分
    }

 // 2.接收客户端下线请求
    public boolean cancel(String appName, String serverId, boolean isReplication) {
        this.handleCancelation(appName, serverId, isReplication);
        return super.cancel(appName, serverId, isReplication); // 在2)中详细分析
  // 3.接收客户端续约请求  
  public boolean renew(final String appName, final String serverId, boolean isReplication) {
        this.log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
        List<Application> applications = this.getSortedApplications();
        Iterator var5 = applications.iterator();

        while(var5.hasNext()) {
            Application input = (Application)var5.next();
            if (input.getName().equals(appName)) {
                InstanceInfo instance = null;
                Iterator var8 = input.getInstances().iterator();

                while(var8.hasNext()) {
                    InstanceInfo info = (InstanceInfo)var8.next();
                    if (info.getId().equals(serverId)) {
                        instance = info;
                        break;
                    }
                }

                this.publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
                break;
            }
        }

        return super.renew(appName, serverId, isReplication); // 在3)中详细分析

    }

  Eureka Client发送的是HTTP客户端,那Eureka Server应该有一个类接收客户端请求,并将具体业务处理委托给InstanceRegistry,这个类就是com.netflix.eureka.resources包下的ApplicationResource、InstanceResource类

  注意:这种接收请求的方式是采用jax-rs的方式,

1)AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication)注册

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            this.read.lock();
	    //  // 1.所有的服务信息都添加到registry这个map中,
            // registry 格式为:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
            EurekaMonitors.REGISTER.increment(isReplication);
    	    // // 1.如果没有该服务的信息,则新建,并添加到registry中
            if (gMap == null) {
                ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
                gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
	    // 2.existingLease信息即服务的一些注册时间等信息,主要是为了校验该服务是否过期,如果已过期,则剔除

            Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
            if (existingLease != null && existingLease.getHolder() != null) {
                Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = (InstanceInfo)existingLease.getHolder();
                }
            } else {
                synchronized(this.lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        ++this.expectedNumberOfClientsSendingRenews;
                        this.updateRenewsPerMinThreshold();
                    }
                }

                logger.debug("No previous lease information found; it is new registration");
            }

            Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }

            ((Map)gMap).put(registrant.getId(), lease);
            ...
        }

    }

  

总结1):通过上述分析可知,服务注册信息最终存放到

// 外层map的key即为应用的服务名,内层map的key为我们设置的eureka.instance.instance-id
// 设置成这种格式,当多个应用提供相同服务时,那么外层map的key都相同,内层map的key不同

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
 
所有的操作都是针对这个map进行操作

2)   AbstractInstanceRegistry.renew(String appName, String id, boolean isReplication)续约

    public boolean renew(String appName, String id, boolean isReplication) {
        EurekaMonitors.RENEW.increment(isReplication);
	// // 1.获取对应map
        Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
	    // 2.主要是为了获取当前服务的一些过期信息
            leaseToRenew = (Lease)gMap.get(id);
        }

        if (leaseToRenew == null) {
           ...
        } else {
           ...

            this.renewsLastMin.increment();
	    // // 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
            // lastUpdateTimestamp = System.currentTimeMillis() + duration;

            leaseToRenew.renew();
            return true;
        }
    }

  

3)AbstractInstanceRegistry.cancel(String appName, String id, boolean isReplication)下线

public boolean cancel(String appName, String id, boolean isReplication) {
        return this.internalCancel(appName, id, isReplication);
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            this.read.lock();
            EurekaMonitors.CANCEL.increment(isReplication);
	     // 1.获取gmap
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
	   	  // 2.删除gmap中该服务i
                leaseToCancel = (Lease)gMap.remove(id);
            }

	    // 3.将当前服务的剔除时间置为当前时间
            //evictionTimestamp = System.currentTimeMillis();
            leaseToCancel.cancel();
	    //// 4.获取服务信息
            InstanceInfo instanceInfo = (InstanceInfo)leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
		 // 5.将服务信息置为已删除
                instanceInfo.setActionType(ActionType.DELETED);
                this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }

            
        } finally {
            this.read.unlock();
        }

       
    } 

总结: 

* 服务的注册实际上是将服务信息添加到一个map中,map的key是服务名称,value也是一个map,是提供该服务的所有客户端信息;

* 服务的续约实际上是获取map中该服务的客户端信息,然后修改其最新更新时间

* 服务的下线实际上是删除该map中该服务信息,然后修改服务状态

参考: Eureka源码深度解析(下)

原文地址:https://www.cnblogs.com/linlf03/p/12601621.html