Eureka服务注册过程

上篇博客《SpringCloud——Eureka服务注册和发现》介绍了Eureka的基本功能,这篇我们来聊聊eureka是如何实现的。

上图是eureka的架构图,Eureka分为Server和client,图中,蓝色为Server端,绿色为client。

基本流程:

1、最左边的client(即服务提供者)发起us-east-1c注册请求;

2、之后,Eureka Server集群中的其他两个node(us-east-1d和us-east-1e进行Replicate复制);

3、图下放的两个client(即服务消费者)分别向三个server获取注册信息及Get Registry。


注册过程:


Eureka Client:

1、DiscoveryClient中的initScheduledTasks()以定时任务的方式执行。

private void initScheduledTasks() { 
	instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize            
	instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

2、在InstanceInfoReplicatorrun方法中,调用discoveryClient的注册方法。

	public void run() {
	        try {
	            discoveryClient.refreshInstanceInfo();
	
	            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
	            if (dirtyTimestamp != null) {
	                discoveryClient.register();
	                instanceInfo.unsetIsDirty(dirtyTimestamp);
	            }
	        } catch (Throwable t) {
	            logger.warn("There was a problem with the instance info replicator", t);
	        } finally {
	            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
	            scheduledPeriodicRef.set(next);
	        }
	    }

3、DiscoveryClient,使用rest调用。

boolean register() throws Throwable {	        logger.info(PREFIX + appPathIdentifier + ": registering service...");
	        EurekaHttpResponse<Void> httpResponse;
	        try {
	            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
	        } catch (Exception e) {
	            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
	            throw e;
	        }
	        if (logger.isInfoEnabled()) {
	            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
	        }
	        return httpResponse.getStatusCode() == 204;
	    }


Eureka Server:


1、rest接口,即服务端rest方式调用的接口。

 @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

2、在PeerAwareInstanceRegistryImpl中的register方法

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

注册方法中,租约不存在。具体如下

if (existingLease != null && (existingLease.getHolder() != null)) { 
	//续租
	.......
}else{  
       // The lease does not exist and hence it is a new registration          
	synchronized (lock) {                    
		if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
}


构造注册信息,处理缓存信息。

 registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);


原文地址:https://www.cnblogs.com/saixing/p/6730189.html