Ribbonn负载均衡

1.Ribbon简介

  Ribbon是Netflix Ribbon实现的一套客户端负载均衡的工具。主要提供客户端的软件负载均衡和服务调用。

  Ribbon客户端提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出LoadBalancer(简称LB)后面所有的机器,Ribbon会基于某种规则(轮询、随机连接)等去连接这些机器。也可以实现自定义的负载均衡算法。

  Ribbon属于进程内的LB,集成于消费方进程,消费方通过它来获取到服务提供方的地址。在Java中对应的就是负载均衡+RestTemplate调用。

  git地址:https://github.com/Netflix/ribbon

2.Ribbon负载均衡和RestTemplate调用

Ribbon的LB的IRule核心类如下:

 主要实现有:

RandomRule:随机

RoundRobinRule:轮询

RetryRule:先按照轮询策略获取,如果获取失败会在指定时间内进行重试

WeightedResponseTimeRule:对RoundRobinRule轮询的扩展,响应速度越快的权重越大,越容易被选中

BestAvailableRule:会过滤掉多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务

AvailabilityFilteringRule:过滤掉故障实例然后选择并发较小的实例

ZoneAvoidanceRule:默认规则,复合判断server所在区域的性能和server的可用性选择服务器

1.默认的负载均衡:

package cn.qz.cloud.config;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
 * 注入一个bean
 */
@Configuration
public class ApplicationContextConfig {
    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }
}

  在getRestTemplate增加@LoadBalanced注解即可实现负载均衡。默认的是轮询算法。验证的话可以通过订单(消费者模块)调用支付模块(服务提供者)根据服务的端口等信息来测试。

2.负载规则替换

 1. 修改全局默认的负载均衡算法

package cn.qz.cloud.config;

import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
 * 注入一个bean
 */
@Configuration
public class ApplicationContextConfig {
    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IRule myRule() {
        // 修改全局默认的负载均衡算法(轮询)义为随机
        return new RandomRule();
    }

}

   注入一个新的IRule对象即可修改全局的负载均衡策略。

2.针对单个服务修改负载均衡策略

这里需要注意:配置类所在的包不能在SpringBoot主启动类所在的包以及子包。SpringBoot启动的时候默认的时候扫描的是启动类所在的包以及其子包,所以如果放在当前包或者子包会替换全局的负载均衡策略。

  对单个服务设置的负载均衡策略>全局的负载均衡策略

package cn.ribbon.rules;

import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RoundRobinRule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyLoadBalanceRule {

    @Bean
    public IRule myRule() {
        // 定义为轮询
        return new RoundRobinRule();
    }

}

SpringBoot主启动类增加注解:(可以用单个注解,也可以用多个注解声明多个服务采用不同的负载规则)

package cn.qz.cloud;

import cn.ribbon.rules.MyLoadBalanceRule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.netflix.ribbon.RibbonClients;

/**
 * 启动类
 */
@SpringBootApplication
@EnableEurekaClient
// 可以使用注解数组定义多个,也可以使用单个注解定义单个规则
@RibbonClients(@RibbonClient(name = "CLOUD-PAYMENT-SERVICE", configuration = MyLoadBalanceRule.class))
//@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyLoadBalanceRule.class)
public class OrderConsumerMain80 {
    public static void main(String[] args) {
        SpringApplication.run(OrderConsumerMain80.class, args);
    }
}

 3. 定义自己的负载均衡规则

  根据RandomRule随机负载均衡算法模仿着写一个即可。

(1) 查看RandomRule类的源码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class RandomRule extends AbstractLoadBalancerRule {
    public RandomRule() {
    }

    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                if (Thread.interrupted()) {
                    return null;
                }

                List<Server> upList = lb.getReachableServers();
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int index = this.chooseRandomInt(serverCount);
                server = (Server)upList.get(index);
                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                    Thread.yield();
                }
            }

            return server;
        }
    }

    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

(2) 编写自己的规则:实现每个服务调用3次的轮询算法

轮询规则:

package cn.ribbon.rules;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomRandomRule extends AbstractLoadBalancerRule {


    /**
     * 每个服务调用的次数
     */
    private int serverCallTimes = 3;

    /**
     * 总共调用的次数
     */
    private AtomicInteger total = new AtomicInteger(-1);

    /**
     * 当前的下标
     */
    private AtomicInteger currentIndex = new AtomicInteger(0);

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;
        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }

            // 可用列表
            List<Server> upList = lb.getReachableServers();
            // 所有服务列表
            List<Server> allList = lb.getAllServers();
            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }

            // 修改总次数以及下次需要取的下标:total++;根据 total % serverCallTimes == 0 ,下标移动
            int totalNum = total.intValue();
            if (totalNum == Integer.MAX_VALUE) {
                total.set(-1);
                totalNum = 0;
            }
            total.addAndGet(1);
            if (totalNum % serverCallTimes == 0) {
                // 判断当前的下标
                if (currentIndex.intValue() == allList.size() - 1) {
                    currentIndex.set(0);
                } else {
                    currentIndex.addAndGet(1);
                }
            }
            server = upList.get(currentIndex.intValue());
            System.out.println("key: " + key + "	currentIndex: " + currentIndex.intValue() + "	total" + total.intValue()) ;

            if (server == null) {
                Thread.yield();
                continue;
            }
            if (server.isAlive()) {
                return (server);
            }
            server = null;
            Thread.yield();
        }
        return server;
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

使用自己定义的规则:

package cn.ribbon.rules;

import com.netflix.loadbalancer.IRule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyLoadBalanceRule {

    @Bean
    public IRule myRule() {//         定义为自定义的负载均衡算法
        return new CustomRandomRule();
    }

}

 主启动类设置:

package cn.qz.cloud;

import cn.ribbon.rules.MyLoadBalanceRule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.netflix.ribbon.RibbonClients;

/**
 * 启动类
 */
@SpringBootApplication
@EnableEurekaClient
// 可以使用注解数组定义多个,也可以使用单个注解定义单个规则
@RibbonClients(@RibbonClient(name = "CLOUD-PAYMENT-SERVICE", configuration = MyLoadBalanceRule.class))
//@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyLoadBalanceRule.class)
public class OrderConsumerMain80 {
    public static void main(String[] args) {
        SpringApplication.run(OrderConsumerMain80.class, args);
    }
} 

3.Ribbon负载均衡算法

 1.分析RoundRobinRule源码

源码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RoundRobinRule extends AbstractLoadBalancerRule {
    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;
    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        this.nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        this.setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        } else {
            Server server = null;
            int count = 0;

            while(true) {
                if (server == null && count++ < 10) {
                    List<Server> reachableServers = lb.getReachableServers();
                    List<Server> allServers = lb.getAllServers();
                    int upCount = reachableServers.size();
                    int serverCount = allServers.size();
                    if (upCount != 0 && serverCount != 0) {
                        int nextServerIndex = this.incrementAndGetModulo(serverCount);
                        server = (Server)allServers.get(nextServerIndex);
                        if (server == null) {
                            Thread.yield();
                        } else {
                            if (server.isAlive() && server.isReadyToServe()) {
                                return server;
                            }

                            server = null;
                        }
                        continue;
                    }

                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }

                if (count >= 10) {
                    log.warn("No available alive servers after 10 tries from load balancer: " + lb);
                }

                return server;
            }
        }
    }

    private int incrementAndGetModulo(int modulo) {
        int current;
        int next;
        do {
            current = this.nextServerCyclicCounter.get();
            next = (current + 1) % modulo;
        } while(!this.nextServerCyclicCounter.compareAndSet(current, next));

        return next;
    }

    public Server choose(Object key) {
        return this.choose(this.getLoadBalancer(), key);
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

   核心的是choose方法。方法里面进行一堆验证之后调用incrementAndGetModulo方法获得了本次提供服务的server的索引。

  incrementAndGetModulo放啊里面是使用类似于自旋锁的概念。用 (当前调用次数+1)%server数量 得到下标。

2.重写轮询算法结合discoveryClient实现负载均衡

 定义一个接口:

package cn.qz.cloud.loadbalance;

import org.springframework.cloud.client.ServiceInstance;

import java.util.List;

public interface LoadBalancer {

    ServiceInstance instances(List<ServiceInstance> serviceInstances);
}

实现类如下:

package cn.qz.cloud.loadbalance;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class MyLoadBalancerImpl implements LoadBalancer {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    public final int getAndIncrement() {
        int current;
        int next;
        do {
            current = this.atomicInteger.get();
            next = current >= 2147483647 ? 0 : current + 1;
        } while (!this.atomicInteger.compareAndSet(current, next));
        System.out.println("****第几次访问,次数next: " + next);
        return next;
    }

    // 负载均衡轮询算法,rest接口第几次请求数 % 服务器集群总数 = 实际调用服务器位置下标
    @Override
    public ServiceInstance instances(List<ServiceInstance> serviceInstances) {
        int index = getAndIncrement() % serviceInstances.size();
        return serviceInstances.get(index);
    }
}

 修改restTemplate取消负载均衡:

package cn.qz.cloud.config;

import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
 * 注入一个bean
 */
@Configuration
public class ApplicationContextConfig {
    @Bean
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }
}

修改Controller增加lb测试接口:

package cn.qz.cloud.controller;

import cn.qz.cloud.bean.Payment;
import cn.qz.cloud.loadbalance.LoadBalancer;
import cn.qz.cloud.utils.JSONResultUtil;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Resource;
import java.net.URI;
import java.util.List;
import java.util.Map;

/**
 * @Author: qlq
 * @Description
 * @Date: 22:09 2020/9/25
 */
@RestController
@RequestMapping("/consumer")
public class OrderController {

    private static final String PAYMENT_URL = "http://CLOUD-PAYMENT-SERVICE";

    @Resource
    private RestTemplate restTemplate;

    @Resource
    private LoadBalancer loadBalancer;

    @Resource
    private DiscoveryClient discoveryClient;

    @GetMapping("/pay/lb")
    public JSONResultUtil<String> lb() {
        // 通过容器中的 discoveryClient和服务名来获取服务集群
        List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");
        if (instances == null || instances.size() <= 0) {
            return null;
        }

        // 传入服务集群来计算出获取具体的服务实例
        ServiceInstance serviceInstance = loadBalancer.instances(instances);
        URI uri = serviceInstance.getUri();
        return restTemplate.getForObject(uri + "/pay/getServerPort", JSONResultUtil.class);
    }
}

主启动类取消负载均衡设置:

package cn.qz.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
 * 启动类
 */
@SpringBootApplication
@EnableEurekaClient
// 可以使用注解数组定义多个,也可以使用单个注解定义单个规则
//@RibbonClients(@RibbonClient(name = "CLOUD-PAYMENT-SERVICE", configuration = MyLoadBalanceRule.class))
//@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyLoadBalanceRule.class)
public class OrderConsumerMain80 {
    public static void main(String[] args) {
        SpringApplication.run(OrderConsumerMain80.class, args);
    }
}

  启动测试即可。注意需要注释掉restTemplate的loadBalance,而且不能和原来的接口一起使用,因为原来的接口基于loadBalance的负载均衡来替换URL中的服务名称。

原文地址:https://www.cnblogs.com/qlqwjy/p/13823705.html