Spring Cloud Ribbon服务列表的缓存与更新(二)

前面讲到RestTemplate的应用也分析了他的实现,接着通过RestTemplate引出了负载均衡,上个篇幅分析了拦截器、负载均衡器的获取及解析,下面我们接着上次内容讲解。

首先看源码一定要学会找入口

 点击getForObject

 点击doExecute后到达下图所示位置

@Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
//发起了一次网络通信请求,我们向上看可以发现这个发起请求的request是由上面createRequest(url,method)创建的 response
= request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for "" + resource + "": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } }

因为request.execute();的实现有很多,所以我们为了搞清楚它的实现到底是哪一个那我们就一定要进入createRequest()方法看下,发现它是调用了HttpAccessor的createRequest方法,它返回的request对像是什么对像取绝于

ClientHttpRequest request = getRequestFactory().createRequest(url, method);其中getRequestFactory()就是一个工厂模式,这个工厂调用的是什么我们可以看下

发现这个工厂有两个方法,一个是本地方法一个子类方法,我们这里面调用的是它的子类的实现InterceptingHttpAccessor,点击进入

 

至于为啥是调用子类不是调用本地这跟他的接口有关,如果它有子类它一定会被他的子类覆盖掉,所以这里面看到有两个直接找它的子类实现,下面的子类也比较简单,它就是覆盖父类的方法

 

下面不看具体业务流程,就看下关键的几个点

@Override
    public ClientHttpRequestFactory getRequestFactory() {
//得到一个拦截器的集合 List
<ClientHttpRequestInterceptor> interceptors = getInterceptors();
//判空
if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) {
//有了上一篇幅的了解,应该很清楚在Bean就已经加载了一个拦截器,所以这里不可能为空 factory
= new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } }

都搞到了这里那就再看下getInterceptors()方法是怎么来的,点进去看下

 我们一路找,找到了下图,看到了private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();但目前为止这个interceptors是空的,前面我们说过拦截器集合不为空,所以接下来我们要看下这个集合是在哪里被赋值的。

 

 要想找到List<ClientHttpRequestInterceptor> interceptors赋值的地方就要去重新看上一篇幅看过的类LoadBalancerAutoConfiguration

 

 所以前面说的List<ClientHttpRequestInterceptor> interceptors = getInterceptors();结果肯定不会为空,我们再回退到上一级,退到

ClientHttpRequest request = getRequestFactory().createRequest(url, method);方法

 我们来分析下这个ClientHttpRequest类的调用关系图,至于为什么分析这个类关系是因为前面说过request.execute();的实现有很多,为了搞清楚它的实现到底是哪一个那我们就一定要进入createRequest()方法看下,我们一直跟进跟到了这个类方法了。在下面众多的实现中我发现了一个拦截器的实现InterceptingClientHttpRequest

 那接下来我肯定是要去找到那拦截器的实现看他搞了啥,但很可惜,下面众多的实现中没有找到我想要的拦截器实现,这时一脸懵逼,这是不是逗我,上面关系图跟我说你有但找你代码的实现确找不到??这时候我们就要换个思路,找不到我们想要的实现类那去找他爹问下他儿子去哪了呗

 所以接下来我们选择他爹,AbstractBufferingClientHttpRequest看下,会发现他又调用了createRequest方法,他这里面是用到了设计模式中的模板模式,如果对模板模式不太清楚的可以自己补下,他定义了一个抽象的方法,他这个抽象的方法通过我们的抽像类去调用自己的方法,因为抽像的方法必须由子类去实现所以他又会去调用子类

 所以我们点击看下createRequest有没有调用子类,点击后一看果然和我猜想的一样

点击进入后发现他返回的是InterceptingClientHttpRequest,所以跟了这么久终于知道了ClientHttpRequest request = getRequestFactory().createRequest(url, method);得到的InterceptingClientHttpRequest

 

现在搞清楚了这一点我们再回退到RestTemplate方法的doExecute中去,我们跟进response = request.execute();方法,会发现它很多实现,如果他没有真正的实现时会找到父类,所以我们进入他的父类抽像方法,

 走到这一步我们会发现executeInternal(this.headers);又有很多实现,老规矩找爹,如果不想这么直接想稳点那就要明白,这个executeInternal的父类方法是什么,从他的关系结构图找

 

 找到他爹后进来截图如下

 接下来我们再向下走,点击execyteInternal又是调用模板方法,但这时他应该调用的是他真正义意上的实现类InterceptingClientHttpRequest

 所以他最终到了下面所示图中的方法中

 点击上图中execute,进入下图所示方法中,下图中有个this.iterator,我们先不管iterator是啥,但从字面上我们可以看出ClientHttpRequestInterceptor这玩意给我们存的应该是一个客户端请求拦截器,而且通过Bebugger可以发现这个拦截器一定会包含我上一篇中分析说到的LoadBalancerInterceptor,这样一步步的看就跟上一篇幅简讲说的连上了。

 选择LoadBalancerInterceptor进去

 进来后一看,和上一篇幅中的时序图完美接上了,后面的方法其实在上一篇幅中全都说过

点击上图的execute进入下图位置

 再点击上图的execute到下图位置,熟悉的配方,这流程上一篇中都跑过,如果这里再来一次就没意思了

 上次说到了已经拿到了server列表 在getServer里面

这里面有一个负载均衡的计算,在这段代码最后他有一句return super.chooseServer(key);这明显是调用了他的父类,那我们在选择时就需要明白这父子关系图,要不然没办法选呀

 

 所以下面在继续走之前看下他的父子关系图

 

 通过类关系图,很清楚的知道调用的是BaseLoadBalancer父类

这里面rule.choose里面有很多规则的执行,上次好像我是说过其中两个规则的执行

 

 进入rule.choose方法,在上一次中有说到这下图红框中提到的List<Server>缓存列表,接下来,这篇幅重点就是讲解下这个列表是从哪里拿来,又是从哪里去更新的

 首先要说下这个list是从哪来的,我们点击getReachableServers可以发现,他在ILoadBalancer中有两个接口,竟然有接口那么就有实现,下面就要搞清楚是谁实现这玩意的

 要是我我就猜它是BaseLoadBalancer实现的

 至于是不是真的,先放着,看代码没有次次搞对,总要凭自己看代码的经验猜上一猜,我们往上翻会发现在BaseLoadBalancer中有两个地方存了这个List<Server>,这两个玩意就是服务器端地址列表在客户端内存中的一个缓存,所以这个List一定存储了服务器端提供者的一个地址列表,竟然有列表存进来那一定会有初始化及更新的过程,下面就来看下他是如何初始化,又是如何更新

 要想搞清楚这个接下来就需要带着猜想去看下逻辑,上一篇幅的时序中有画ILoadBalancer的真正实例是ZoneAwareLoadBalancer,而在我刚刚画的ZoneAwareLoadBalancer的类关系图中可以发现他的父类是DynamicServerListLoadBalancer(如果有忘记的可以向上翻一翻),再往上就是的父类就是BaseLoadBalancer,这样一来初始化过程就和我猜的一样了,有了这个概念后我们进入DynamicServerListLoadBalancer方法,在它的构造方法中可以发现一个restOfInit(clientConfig);方法,这是进入服务列表进入初始化的一个流程,触发点就是DynamicServerListLoadBalancer被初始化和实例化以后它会在构造方法中触发一个restOfInit(clientConfig);

restOfInit(clientConfig)最终会调用updateAllServerList去更新,然而这更新有两个地方,一个是更新本地服务另一个是从Eureka中动态获取服务代表,到这里初始化和更新问题就解决了

 更新下时序图

 更新完时序图后点击restOfInit(clientConfig)往下走,里面有两个操作的方法一个是定时更新Eureka实例列表的enableAndInitLearnNewServersFeature();另一个是获取所有Eureka实例列表的updateListOfServers

 进入updateListOfServers,这里面的更新是去更新我们启动的服务的列表,这里面有两个实现,之前我说过这里面的更新一个是更新本地服务另一个是更新Eureka实例列表,那么按理说这里面也应该有Eureka的实现,这是因为我当前还没有添加Eureka的依赖包,所以就没有这个实现类,所以我们这里看本地配置更新ConfigurationBaseServerList

 进入ConfigurationBaseServerList后会由listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);得到一个listOfServers , 这里面的IClientConfig是从上一篇中说的RibbonClientConfiguration中完成初始化的

点击上图中ListOfServers可以发现是从客户端的本地配置拿到服务的,不信可以Debug一下

 

 更新下时序图

 这一条链路说完后,再返回到enableAndInitLearnNewServersFeature();这个定时更新Eureka实例列表这个方法中来看,通过看下面截图可以很清楚的看到,他之所以能动态拿取是因为它启动了一个定时任务,竟然是定时任务那就要看下它任务的updateAction是啥

 点击updateAction后发现他是一个内部类,这里在面的UpdateListOfServers()又是前面说到过的获取所有实例的方法

 

 回到前面的serverListUpdater.start(updateAction);看下start里面定义的是啥

  @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
//最终执行的任务 wrapperRunnable,
//下面的是延时时间 initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); }
else { logger.info("Already active, no-op"); } }

更新下时序图

 前面通过updateListOfServers() 中的servers = serverListImpl.getUpdatedListOfServers();就已经获取到了服务的列表,在列表获取后要做的事就是把列表给放到缓存中去,接下来看updateListOfServers()的updateAllServerList(servers);是怎么做到的,进入后发现他有一个compareAndSet操作,如果看过我前面写的多线程的文章的话可以很清楚的知道他这是做了一个锁的操作,之所以这么操作是为了防止已经有其它线程在执行这个代码

 然后进入setServersList(ls)方法,如果不知道怎么选那就又要看类关系图了

 因为现在在DynamicServerListLoadBalancer中,所以很明显是找父类BaseLoadBalancer

 通过这样分析很明了的发现是在BaseLoadBalancer类中完成赋值的,和我之前的猜想完美接合上了

 更新下时序图

 现在看来从服务的解析到服务列表的找寻及缓存都解决了,看拟问题都解决了,但是还有一个问题,那就是服务的存活问题,因为在生产环境中有服务挂机的情况,所以这里面的设计应该还有一个定时去ping下服务是否运转正常,如果ping的结果发现服务有异常那一定会去改变我们的ILoadBalancer的服务列表,把它下线,这个代码是写在BaseLoadBalancer类中,它在构造方法中通过setupPingTask()方法启动一个任务

 点击setupPingTask(),下面代码也很简短

void setupPingTask() {
//是否可以跳过
if (canSkipPing()) { return; }
//如果不等空偏锁
if (lbTimer != null) { lbTimer.cancel(); }
//构造任务 lbTimer
= new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
//schedule是任务,pingIntervalSeconds是任务间隔时间,这里面是10S lbTimer.schedule(
new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); }

先进入new PingTask()看做了什么

  class PingTask extends TimerTask {
        public void run() {
            try {
//策略模式,通过传入的值决定是用哪个ping去完成
new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }
public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }
            
            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
//最终ping的是allServers中的一个服务的节点,然后通过调用一个策略去ping,下面就是要看pingerStrategy调的策略是谁 results
= pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } }

分析完上面的逻辑后发现这个策略是IPingStrategy,但是在这个类中找不到他是在哪初始化的,此时就要猜了,根据时序图找链路,我猜是在RibbonClientConfiguration中完成初始化,在配置类中找,找到了如下的ping他先判断是否设置过,如果设置过就用设置的,如果没有就用默认的,这个就是初始化过程

 

 初始化完成了,那接下来要关心的就是这个IPing是在哪里被赋值给了ILoadBalancer中去,向下找会发现如下图,由这里我就知道了这个ping的初始化、赋值及它如果没有设置它默认返回的是DummyPing()了(这个默认的ping是不做任务处理的,如果不信可以看他的类方法)

 有了上面的概念后那继续跟着results = pingerStrategy.pingServers(ping, allServers);逻辑走,点击pingServers看它ping的逻辑

 进入PingUrl看下它的心跳写法,里面就是每隔一段时间就发起一次请求,这里面发起的是HTTP请求,然后根据返回的状态码进行判断

 根据官网资料https://docs.spring.io/spring-cloud-netflix/docs/2.2.5.RELEASE/reference/html/#spring-cloud-ribbon,它这个ping是可以扩展的,下面就这个扩展来写下

 

 

 Debug访问一下,可以看到自定义的已经加载进去了

 

原文地址:https://www.cnblogs.com/xing1/p/14141571.html