系统限流(分布式)

系统限流(分布式)

其他开源框架限流方案:(gateway、zuul都集成以下一种或底层算法基于其中)

  • Guava->RateLimiter

支持:平滑突发限流、平滑预热限流

// 平滑突发限流器,RateLimiter.create(5) 表示这个限流器容量为 5,并且每秒生成 5 个令牌,也就是每隔 200 毫秒生成一个
RateLimiter limiter = RateLimiter.create(5);

//平滑预热限流器,第一个参数还是每秒创建的令牌数量,这里是每秒 2 个,也就是每 500 毫秒生成一个,后面的参数表示从冷启动速率过渡到平均速率的时间间隔,也就是所谓的热身时间间隔(warm up period)
RateLimiter limiter = RateLimiter.create(2, 3, TimeUnit.SECONDS);
  • Bucket4j
// Bucket接口代表了令牌桶的具体实现
// Bandwidth 的意思是带宽,可以理解为限流的规则(simple 和 classic)
Bucket bucket = Bucket4j.builder().addLimit(limit).build();
// simple 方式创建的 Bandwidth,表示桶大小为 10,填充速度为每分钟 10 个令牌
Bandwidth limit = Bandwidth.simple(10, Duration.ofMinutes(1));
// 桶大小为 10,填充速度为每分钟 5 个令牌
Refill filler = Refill.greedy(5, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(10, filler);
// Refill 用于填充令牌桶,可以通过它定义填充速度,Bucket4j 有两种填充令牌的策略:间隔策略(intervally) 和 贪婪策略(greedy).在上面的例子中我们使用的是贪婪策略,如果使用间隔策略可以像下面这样创建 Refill:
Refill filler = Refill.intervally(5, Duration.ofMinutes(1));
// 间隔指的是一次性全部放入,贪婪指的是根据时间单元一次一个

缺点:Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用Redis

  1. Resilience4j

Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiterAtomicRateLimiterSemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。

Resilience4j 也提供了两种隔离的实现:SemaphoreBulkheadThreadPoolBulkhead,通过信号量或线程池控制请求的并发数,

缺点:不支持分布式限流

方案一:gateway配置方式

  1. 引入所需依赖
    <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-gateway</artifactId>
   </dependency>
   
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifatId>spring-boot-starter-data-redis-reactive</artifactId>
   </dependency>
  1. 添加限流策略配置类
/**
 * 限流策略配置类
 *
 * @author Andrew
 * @date 2021/6/3
 */
@Configuration
public class CurrentLimitingResolver {
    
    /**
     * 基于IP地址进行限流
     * @return KeyResolver
     */
    @Bean("remoteAddrKeyResolver")
    @Primary
    public KeyResolver remoteAddrKeyResolver() {
        return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress());
    }

    /**
     * 基于hostName地址进行限流
     * @return KeyResolver
     */
    @Bean("remoteHostNameKeyResolver")
    public KeyResolver remoteHostNameKeyResolver() {
        return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostName());
    }

    /**
     * 根据请求参数进行限流
     * @return KeyResolver
     */
    @Bean("remoteUserKeyResolver")
    public KeyResolver remoteUserKeyResolver() {
        return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest().getQueryParams().getFirst("user")));
    }

    /**
     * 接口限流
     * @return KeyResolver
     */
    @Bean("remoteApiKeyResolver")
    public KeyResolver apiKeyResolver() {
        return exchange -> Mono.just(exchange.getRequest().getPath().value());
    }
}
  1. yml文件配置所需参数
# 除了gateway相关配置还需加上redis配置信息,因开启限流后会自动生成两个kv
spring:
  application:
    name: cloud-gateway
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true # 开启从注册中心动态创建路由的功能,利用微服务名进行路由
      routes:
        - id: payment_routh # 路由的ID,没有固定规则但要求唯一,建议配合服务名
          uri: lb:// cloud-payment-service #微服务名
           filters:
             - name: RequestRateLimiter # RequestRateLimiter的限流过滤器,基于令牌桶算法
               args:
                 redis-rate-limiter.replenishRate: 10 # 令牌桶每秒填充平均速率
                 redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
                 key-resolver: "#{@remoteAddrKeyResolver}" # 限流策略,spEl语法
          predicates:
            - Path=/payment/get   # 断言,路径相匹配的进行路由

方案二:Zuul配置方式(基于guava)

  1. 导入依赖
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-zuul</artifactId>
    </dependency>
    <dependency>
      <groupId>com.marcosbarbero.cloud</groupId>
      <artifactId>spring-cloud-zuul-ratelimit</artifactId>
      <version>2.0.4.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.0.4.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.5.0</version>
    </dependency>
  1. 主启动类加入注解@EnableZuulProxy

  2. 配置文件

// 若开启微服务名称即可不用配置routes自动会加入path
zuul:
  ratelimit:
    enabled: true // 开启路由
    repository: REDIS // 存储位置redis,也可更换配置用CONSUL、REDIS、JPA、IN_MEMORY
    default-policy: // 全局限流策略
      limit: 1
      quota: 1
      refresh-interval: 3
    policies: // 个别微服务限流策略
      product-service:
        limit: 5
        quota: 10
        refresh-interval: 60
        type: url
  routes: // 路由
    product-service:
      path: /product-service/** // 根据path寻找下面的url微服务节点
      url: http://localhost:7070/
    order-service:
      path: /order-service/**
      url: http://localhost:9090/

令牌桶算法执行逻辑:

  1. 取出当前时间
  2. 当前时间大于上次放入令牌时间成立继续往下执行
  3. 距离上次时间已过去多久?
  4. 时间*生产速度=剩余令牌
  5. 剩余令牌与桶容量比较取出最小值作为剩余令牌
  6. 重新对时间进行赋值

方案三:基于redis与lua脚本令牌桶算法实现

  1. 导入redis依赖
<!-- Spring Boot Redis 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0 集成 redis 所需common-pool2-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
  1. 自定义限流yml文件(application-ratelimit.yml)
# 自定义限流参数
ratelimit:
  # 限流全局开关
  enabled: true
  # 普通请求相关
  ordinary:
    # 限流次数
    limit: 5
    # 限流单位时间
    refresh-interval: 60
  # 升级请求相关
  upgrade:
    limit: 3
    refresh-interval: 60
    # 具体msgType TODO 需要确认是否是以下接口
    url:
      - 23710
      - 23711
      - 24334
      - 24335
      - 24336
      - 24337
      - 24338
      - 24339
      - 24340
      - 24341
      - 24348
      - 24349
      - 24350
  1. 编写lua脚本(算法)
-- 切换到命令复制模式
redis.replicate_commands();
-- 参数中传递的令牌key,基于选定的限流策略来定(唯一)
local key = KEYS[1]
-- 令牌桶填充 限流单位时间
local update_len = tonumber(ARGV[1])
-- 记录 第一次访问的时间戳
local key_time = key..'_FRT'
-- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后毫秒数),由于我是按秒计算的,这里只要curr_time_arr[1](注意:redis数组下标是从1开始的)
-- 如果需要获得毫秒数 则为 tonumber(arr[1]*1000 + arr[2])
local curr_time_arr = redis.call('TIME')
-- 当前时间秒数
local nowTime = tonumber(curr_time_arr[1])
-- 从redis中获取当前key 第一次访问的时间戳,无直接赋值0,有即value
local curr_key_time = tonumber(redis.call('get', key_time) or 0)
-- 获取当前key对应令牌桶中的令牌数,无直接赋值-1,有即value
local token_count = tonumber(redis.call('get', key) or -1)
-- 当前令牌桶的容量,用户自定义初始化大小
local token_size = tonumber(ARGV[2])
-- 令牌桶数量小于0 说明令牌桶没有初始化
if token_count < 0 then
   redis.call('set', key_time, nowTime)
   redis.call('set', key, token_size -1)
    redis.call('expire', key_time, update_len) -- 从第一次访问开始限流,设置对应键值的过期
    redis.call('expire', key, update_len)
   return token_size -1
else
   if token_count > 0 then -- 当前令牌桶中令牌数够用
      redis.call('decr', key)
      return token_count -1   -- 返回剩余令牌数
   else    -- 当前令牌桶中令牌数已清空
      return -1
   end
end
  1. 自定义全局异常抛出

先继承RunTimeException类创建一个异常类

package com.intretech.smart.mqtt.msgserver.config;


import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;

/**
 * 自定义运行异常
 *
 * @author Andrew
 * @date 2021/6/22
 */
public class RateLimiterException extends RuntimeException {
    /**
     * 状态码
     */
    private final String code;

    public RateLimiterException(RateLimiterEnum rateLimiterEnum) {
        super(rateLimiterEnum.getMessage());
        this.code = rateLimiterEnum.getCode();
    }

    public String getCode() {
        return code;
    }
}

@ControllerAdvice ,很多初学者可能都没有听说过这个注解,实际上,这是一个非常有用的注解,顾名思义,这是一个增强的 Controller。使用这个 Controller ,可以实现三个方面的功能:

  1. 全局异常处理
  2. 全局数据绑定
  3. 全局数据预处理
package com.intretech.smart.mqtt.msgserver.config;

import com.intretech.smart.mqtt.msgserver.domian.ExceptionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * 全局自定义异常消息
 *
 * <p>根据用户自定义抛出运行时异常返回客户端,在无自定义情况下默认返回500.
 *
 * @author Andrew
 * @date 2021/6/22
 */
@ControllerAdvice
public class GlobalRateLimiterExceptionHandler {
    private static final Logger logger = LoggerFactory.getLogger(GlobalRateLimiterExceptionHandler.class);

    @ExceptionHandler(RateLimiterException.class)
    @ResponseBody
    public ExceptionResult handleCustomRateLimiterException(RateLimiterException e) {
        if (StringUtils.isEmpty(e)) {
            logger.error("GlobalExceptionHandler: handleCustomRateLimiterException(RateLimiterException e) parameter RateLimiterException is null");
        }

        // 生成自定义返回异常类
        ExceptionResult result = new ExceptionResult();
        // 在无自定义情况下500
        if (StringUtils.isEmpty(e.getCode())) {
            result.setCode(HttpStatus.INTERNAL_SERVER_ERROR.toString());
        }
        if (StringUtils.isEmpty(e.getMessage())) {
            result.setMsg(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase());
        }

        result.setCode(e.getCode());
        result.setMsg(e.getMessage());
        return result;
    }
}
  1. 异常返回封装类
package com.intretech.smart.mqtt.msgserver.domian;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

/**
 * 自定义异常返回类
 *
 * @author Andrew
 * @date 2021/6/22
 */
@Getter
@Setter
public class ExceptionResult implements Serializable {
    /**
     * 异常码
     */
    private String code;
    /**
     * 异常消息描述
     */
    private String msg;
}
  1. 利用责任链模式组装两个限流策略

抽象父类

package com.intretech.smart.mqtt.msgserver.responsibility;

import lombok.Getter;
import lombok.Setter;

/**
 * 限流抽象父类
 *
 * <p>利用责任链模式组装需要执行限流的策略.
 *
 * @author Andrew
 * @date 2021/6/21
 */
@Getter
@Setter
public abstract class AbstractRateLimiterHandler {
    /**
     * 限流抽象父类
     */
    private AbstractRateLimiterHandler nextAbstractRateLimiterHandler;

    /**
     * 处理请求限流方法
     *
     * @param sessionId 会话key
     * @param msgType   消息类型
     */
    public abstract void judgeMsgType(String sessionId, String msgType);
}

具体实现类

package com.intretech.smart.mqtt.msgserver.responsibility;

import com.google.common.collect.ImmutableList;
import com.intretech.smart.mqtt.msgserver.config.RateLimiterException;
import com.intretech.smart.mqtt.msgserver.constant.RateLimiterEnum;
import com.intretech.smart.mqtt.msgserver.domian.RateLimiter;
import com.intretech.smart.mqtt.msgserver.service.RateLimiterService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

/**
 * 升级请求接口自定义限流策略类
 *
 * <p>继承自AbstractRateLimiterHandler,实现责任链模式.
 * 对升级功能的接口全部列出,匹配则执行限流方法,无匹配则转发至下一个责任链.
 *
 * @author Andrew
 * @date 2021/6/21
 */
@Configuration
public class UpGradeLimiterHandler extends AbstractRateLimiterHandler {
    private static final Logger logger = LoggerFactory.getLogger(UpGradeLimiterHandler.class);

    @Autowired
    private StringRedisTemplate limitRedisTemplate;

    @Autowired
    private DefaultRedisScript<Number> redisScript;

    @Autowired
    private RateLimiter rateLimiter;

    @Autowired
    private RateLimiterService rateLimiterService;

    /**
     * 生成限流唯一key
     *
     * @return 限流唯一key
     */
    private String makeRateLimiterKey(String clientId, String url) {
        return RateLimiterCommon.UPGRADE_KEY_PREFIX + url + clientId;
    }

    @Override
    public void judgeMsgType(String sessionId, String msgType) {
        if (rateLimiter.getUpGradeLimiter().getUrl().contains(msgType)) {
            performRateLimiter(sessionId, msgType);
        } else {
            if (getNextAbstractRateLimiterHandler() != null) {
                getNextAbstractRateLimiterHandler().judgeMsgType(sessionId, msgType);
            } else {
                logger.warn("OrdinaryLimiterHandler: No matching executable current limiting policy sessionId:{}, msgType:{}", sessionId, msgType);
            }
        }
    }

    /**
     * 限流执行具体方法
     *
     * @param sessionId 会话token
     * @param msgType   消息类型
     */
    private void performRateLimiter(String sessionId, String msgType) {
        // 获得唯一客户端标识
        String clientId = rateLimiterService.judgeClient(sessionId);
        // 取出消息体中唯一客户端标识
        if (StringUtils.isEmpty(clientId)) {
            logger.info("OrdinaryLimiterHandler: rateLimiterService.judgeClient(String accessToken) return null, sessionId:{}", sessionId);
            return;
        }

        // 生成唯一限流key
        String limitKey = makeRateLimiterKey(clientId, msgType);
        // 不可变、线程安全的列表集合,只会获取传入对象的一个副本
        ImmutableList<String> keys = ImmutableList.of(limitKey);
        Number count = limitRedisTemplate.execute(redisScript, keys, rateLimiter.getUpGradeLimiter().getRefreshInterval(), rateLimiter.getUpGradeLimiter().getLimit());
        // 判断redis是否返回-1,-1代表无令牌,其他正数则是剩余令牌数
        if (count == null || count.intValue() == -1) {
            logger.info("OrdinaryLimiterHandler:clientId:{} access more than limit:{} times in freshInterval:{}", clientId, rateLimiter.getUpGradeLimiter().getLimit(), rateLimiter.getUpGradeLimiter().getRefreshInterval());
            throw new RateLimiterException(RateLimiterEnum.TOO_MANY_REQUESTS);
        }
    }
}

总结:

1、分布式限流方案优选应当以redis作为缓存中间件配合lua脚本实现令牌桶算法,redis存储两个关键值:1.客户端第一次访问时间 2.当前桶中剩余令牌数、

2、基于restFull风格的api在每次变更后会认定为是一个全心的url不受限流作用,如果以带参形式访问即使换了参数也被认为是同一个url受限流作用。

   // 如果换掉ID即认为是新的url,比如localhost:8080/1 与localhost:8080/2虽访问同一个接口但是被认定为不同访问者
    @GetMapping("/{id}")
    public String selectOrderById(@PathVariable("id") Integer id) {
        return orderService.selectOrderById(id);
    }
  // 参数不同还是被认为是同一访问者,比如localhost:8080/single?id=1 与localhost:8080/single?id=2都被限流
    @GetMapping("/single")
    public String updateOrderById(Integer id) {
        return orderService.updateOrderById(id);
    }
原文地址:https://www.cnblogs.com/zbm-gg/p/14933441.html