微服务通信之ribbon实现原理

前言

上一篇我们知道了feign调用实现负载均衡是通过集成ribbon实现的。也较为详细的了解到了集成的过程。现在我们看一下ribbo是如何实现负载均衡的。写到这里我尚未去阅读源代码,我在这里盲猜一下: 他肯定是有一个从注册中心拉取配置的模块,一个选择调用服务的模块。然后我们就带着这样的指导思想去看源码。

一、ribbo是何时从eurake加载的服务列表?

从上一篇文章我们知道,feign调用实际上调用的是AbstractLoadBalancerAwareClient.executeWithLoadBalancer(...)方法,我们看一下该方法做了那些事情:


public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            // 省略
        }
        
    }


可以看到上述代码主要是创建了一个负载均衡执行命令类,然后执行请求。我们看看提交请求后执行的具体逻辑:


public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(...);

       // 省略部分代码
    }

这里可以看到它使用了RXjava,rxjava主要是利用观察者思想实现的链式调用,其源码的实现稍显复杂,自己使用需要谨慎,借用Uncle Ben的一句话 ”with great power comes great responsibility “。我们关心一下上面的selectServers()方法,从方法名我们可以大致猜出,它是在选择调用的服务。我们看一下此方法的实现:


    /**
     * Return an Observable that either emits only the single requested server 
     * or queries the load balancer for the next server on each subscription
     */
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

显然核心逻辑在loadBalancerContext.getServerFromLoadBalancer(...)里面,我们继续向下看:


public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
       

        // 省略部分代码
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);
            } 
        }

        // 省略部分代码
        return new Server(host, port);
    }

非常戏剧性的是,我们看到他选择一个服务是通过ILoadBalancer进行的,那我们的看一下ILoadBalancer是怎么创建的。首先ILoadBalancer是属于Ribbon提供的类,其创建我们先看Ribbon自身的ILoadbalance创建过程,发现其是通过RibbonClientConfiguration的下述方法创建:


        @Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

可以看到上述方法先判断name对应的ILoadBalancer是否存在,不存在就创建了一个。我们看一下这个name代表的是什么:


	@RibbonClientName
	private String name = "client";

可以看到它取得的是客户端名称,其实就是@FeignClient的name属性的值,这里可以直接告诉你怎么实现动态给这里的name赋值的:在生成feign的代理对象过程的实现,会将feign通过@FeingClient的contentId属性进行Context隔离,在加载配置的时候会将@RibbonClientName属性的@Value属性的环境变量的值设置成当前@FeignClient的name值,具体可以直看NamedContextFactory.createContext()方法。言归正转,ZoneAwareLoadBalancer的构建显然是持有了Config,Rule,Ping,ServerList,ServerListFilter,ServerListUpdater。
现在我们看一下这四个类的分工与职责。

职责
Config config 主要是配置了服务名称,服务读取的一些配置比如刷新服务间隔等
Rule 选用服务的规则,他决定了选用哪个服务
Ping 主要用于探测服务列表中的服务是否可用
ServerList 它主要是提供最新服务列表:包括上线、下线的服务(默认实现DiscoveryEnabledNIWSServerList)
ServerListFilter 用于过滤服务,do what you want to do.
ServerListUpdater 用于更新服务列表(当前的默认实现是启动定时器,然后调用ServerList获取服务列表,将当前负载均衡可用服务列表全覆盖)

知道了上面的职责划分,我们看到ZoneAwareLoadBalancer最终初始化服务列表的方法:


    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

基于上述我们可以清晰的了解到,获取服务列表发生在ServerList的实现中(serverListImpl.getUpdatedListOfServers())。

二、ribbo配合Eurake的ServerList实现

上面serverListImpl 实现类就是DiscoveryEnabledNIWSServerList,当调用getUpdatedListOfServers()最终调用到了如下所述方法,我们看一下他干了些什么:

// 省略了部分代码
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {
                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
            }
        }
        return serverList;
    }

上述代码主要是通过EurakeClient 获取对应服务实例,然后将服务实例向DiscoveryEnabledServer进行映射。至此从服务列表的获取过程就完成了。

三、 Rule 服务的选择

知道了服务列表的来源,接下来就是最关键的一步了,选择一个服务调用。我们先看一下默认情况使用的是什么规则(RibbonClientConfiguration):


        @Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}

显然默认使用的是ZoneAvoidanceRule。 这里我们看一下IRule都有哪些实现,然后阐述一下各个算法实现(这里没有再看源码的必要了,算法就那些):

描述
ZoneAvoidanceRule 基于可用性与区域的复合预测规则
RoundRobinRule 轮询
WeightedResponseTimeRule 根据响应时间选择
RandomRule 随机规则

小结

总结起来,ribbon的核心组成就是ZoneAwareLoadBalancer的组成。其分工明确,服务列表的维护、服务的可用性、服务的选择都有专门的类去负责。下一篇将会着重讲一下Feign、Ribbon如何做到基于服务的配置隔离的,会适当配合实战,比如怎么自定义一个全局生效的路由规则,自定义一个基于服务名的生效的路由规则等等。

原文地址:https://www.cnblogs.com/enjoyall/p/13808902.html