Spring Cloud-Ribbon负载均衡策略类IRule(五)

IRule

IRule

AbstractloadBalancerRule

负载均衡策略抽象类 负责获得负载均衡器 保存在内部 通过负载均衡器维护的信息 作为分配的依据

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

    private ILoadBalancer lb;
        
    @Override
    public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
    }
    
    @Override
    public ILoadBalancer getLoadBalancer(){
        return lb;
    }      
}

RandomRule

随机选择一个服务的策略

public class RandomRule extends AbstractLoadBalancerRule {

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            //通过负载均衡器获得可用服务
            List<Server> upList = lb.getReachableServers();
            //通过负载均衡器获得所有服务
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            //没有服务返回空
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }
             //通过ThreadLocalRandom.current().nextInt(serverCount); 获得一个随机数
            int index = chooseRandomInt(serverCount);
            //获得一个随机的服务
            server = upList.get(index);

            if (server == null) {
                /**
                 * 线程让步  将线程的cpu执行时间让步出来  可以理解为本来是排队有序的做一件事情
                 * 然后轮到那个人的时候他突然说 大家一起竞赛吧 谁先抢到就是谁的  也包括自己 线程优先级越高 获得的机率越大
                 */
                Thread.yield();
                continue;
            }
            //判断服务是否有效
            if (server.isAlive()) {
                return (server);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }

        return server;

    }

RoundRobinRule

public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            //获得所有有效服务
            List<Server> reachableServers = lb.getReachableServers();
            //获得所有服务
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            //获得线性轮训 当前轮到的服务下标
            int nextServerIndex = incrementAndGetModulo(serverCount);
            //去除服务
            server = allServers.get(nextServerIndex);

            if (server == null) {
               //让出cpu执行时间
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
    private int incrementAndGetModulo(int modulo) {
        //死循环
        for (;;) {
            //cas AtomicInteger 类 保证++的原子性
            int current = nextServerCyclicCounter.get();
            //线性轮训算法 
            int next = (current + 1) % modulo;
            //compareAndSet的作用是防止多线程下还没执行到这一句 current被修改   如果被修改返回false 重新开始
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

RetryRule

/**
 * 具有重试机制的Rule
 */
public class RetryRule extends AbstractLoadBalancerRule {
    //内部默认维护一个线性轮训的Rule
    IRule subRule = new RoundRobinRule();
    long maxRetryMillis = 500;

    public RetryRule(IRule subRule) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
    }

    public RetryRule(IRule subRule, long maxRetryMillis) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
        this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
    }
    public IRule getRule() {
        return subRule;
    }

    /**
     * 内部找到就返回 找不到就重试
     * @param lb
     * @param key
     * @return
     */
    public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        //尝试结束时间  maxRetryMillis阈值 可配置
        long deadline = requestTime + maxRetryMillis;

        Server answer = null;

        answer = subRule.choose(key);

        if (((answer == null) || (!answer.isAlive()))
                && (System.currentTimeMillis() < deadline)) {

            InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());
            /**
             *   new Tread().interrupt()给线程增加一个中断标志 但是并不会影响线程执行  但是如果这个时候对线程执行sleep和 wait 底层会将中断状态重置为false并抛出异常InterruptedException 所以我们可以根据捕获这个异常判断线程是否中断
             *   Thread.interrupted()判断线程的中断状态 并重置线程的中断状态为false
             *   new Tread().isInterrupted();仅仅判断线程是否中断不会重置
             */

            while (!Thread.interrupted()) {
                answer = subRule.choose(key);

                if (((answer == null) || (!answer.isAlive()))
                        && (System.currentTimeMillis() < deadline)) {
                    /* pause and retry hoping it's transient */
                    Thread.yield();
                } else {
                    break;
                }
            }

            task.cancel();
        }

        if ((answer == null) || (!answer.isAlive())) {
            return null;
        } else {
            return answer;
        }
    }
/**
 * RoundRobinRule的扩展
 * 内部根据实例运行情况来进行权重 并根据权重挑选实例
 */
public class WeightedResponseTimeRule extends RoundRobinRule {
    

    void initialize(ILoadBalancer lb) {
        if (this.serverWeightTimer != null) {
            this.serverWeightTimer.cancel();
        }

        this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
        //开启一个定时任务为实例进行统计 用于计算权重  默认30秒执行一次
        this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
        WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
        sw.maintainWeights();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
                WeightedResponseTimeRule.this.serverWeightTimer.cancel();
            }
        }));
    }

    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                //获得权重
                List<Double> currentWeights = this.accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }
                //获得所有服务
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int serverIndex = 0;
                //获得最后一个权重
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);

                //如果权重大于0.01
                if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
                    //通过随机数计算一个权重
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    int n = 0;

                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        //如果实例在那个权重区间 则定位此服务索引
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }
                    //返回对应实例
                    server = (Server)allList.get(serverIndex);
                } else {
                    //如果实例的权重小于0.0.1 则采用父类的线性轮训算法
                    server = super.choose(this.getLoadBalancer(), key);
                    if (server == null) {
                        return server;
                    }
                }

                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                }
            }

            return server;
        }
    }

    void setWeights(List<Double> weights) {
        this.accumulatedWeights = weights;
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
    }

    class ServerWeight {
        ServerWeight() {
        }

        public void maintainWeights() {
            ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
            if (lb != null) {
                if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                    try {
                        WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                        AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                        //获得统计信息
                        LoadBalancerStats stats = nlb.getLoadBalancerStats();
                        if (stats != null) {
                            //保存所有实例的的平均响应时间总和
                            double totalResponseTime = 0.0D;

                            ServerStats ss;
                            for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                                Server server = (Server)var6.next();
                                ss = stats.getSingleServerStat(server);
                            }

                            Double weightSoFar = 0.0D;
                            //用于保存权重  下标对应实例在负载均衡器中的位置
                            List<Double> finalWeights = new ArrayList();
                            Iterator var20 = nlb.getAllServers().iterator();

                            while(var20.hasNext()) {
                                Server serverx = (Server)var20.next();
                                //如果服务的状态不再快照汇总 则这里加载
                                ServerStats ssx = stats.getSingleServerStat(serverx);
                                //计算权重 平均响应时间总和-实例的响应平均响应时间+weightSoFar
                                double weight = totalResponseTime - ssx.getResponseTimeAvg();
                                //每次都会累加
                                weightSoFar = weightSoFar + weight;
                                //保存权重
                                finalWeights.add(weightSoFar);
                            }

                            WeightedResponseTimeRule.this.setWeights(finalWeights);
                            return;
                        }
                    } catch (Exception var16) {
                        WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
                        return;
                    } finally {
                        WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                    }

                }
            }
        }
    }
    //负责权重计算的定时任务
    class DynamicServerWeightTask extends TimerTask {
        DynamicServerWeightTask() {
        }

        public void run() {
            WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();

            try {
                //计算权重
                serverWeight.maintainWeights();
            } catch (Exception var3) {
                WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
            }

        }
    }
}

WeightedResponseTimeRule

ClientConfigEnabledRoundRobinRule

不怎么使用 也是线性轮训  用于继承扩展

public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

    RoundRobinRule roundRobinRule = new RoundRobinRule();

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        roundRobinRule = new RoundRobinRule();
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        roundRobinRule.setLoadBalancer(lb);
    }
    
    @Override
    public Server choose(Object key) {
        if (roundRobinRule != null) {
            return roundRobinRule.choose(key);
        } else {
            throw new IllegalArgumentException(
                    "This class has not been initialized with the RoundRobinRule class");
        }
    }

}

BestAvailableRule

选出最空闲的服务实例

/**
 *该策略是选择 最空闲的那一个
 */
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {

    private LoadBalancerStats loadBalancerStats;

    @Override
    public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        //取得所有服务实例
        List<Server> serverList = getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        //遍历所有服务实例
        for (Server server: serverList) {
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                //取得最空闲的服务
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        //如果没有找到 继续延用父类的线性轮训
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }


}

PredicateBasedRule

先过滤清单再轮训

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    //内部使用PredicateBasedRule 实现服务的过滤
    public abstract AbstractServerPredicate getPredicate();
    
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        /**
         * 基于Predicate实现服务的过滤
         * Predicate是Google Guava Collection的集合工具
         * 可以帮助我们让集合操作代码更为简短精练并大大增强代码的可读 性
         */
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }
    }
}
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
        } else {
            List<Server> results = Lists.newArrayList();
            for (Server server : servers) {
                //过滤服务
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;
        }
    }

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
        List<Server> eligible = getEligibleServers(servers);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }


}

AvailabilityFilteringRule

public class AvailabilityFilteringRule extends PredicateBasedRule {
    private AbstractServerPredicate predicate;
    public AvailabilityFilteringRule() {
        super();
        //初始化 下面predicate.apply的比较策略
        predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        //初始化下面 predicate.apply的比较策略
        predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    @Override
    public Server choose(Object key) {
        int count = 0;
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {
            /**
             * 优化父类 先过滤再遍历的额外开销
             * 一边遍历 判断是否故障或者超过最大并发阀值 是否故障, 即断路器是否生效已断开。
             * 实例的并发请求数大于阙值,默认值为 232 -1, 该配置可通过参数<clientName>. <nameSpace>.ActiveConnectionsLimit 来修改。
             */
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            server = roundRobinRule.choose(key);
        }
        return super.choose(key);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return predicate;
    }
}
原文地址:https://www.cnblogs.com/LQBlog/p/10084581.html