分布式限流

限流

  在高并发场景下对高并发访问/请求进行限速或者对一个时间单位内的请求进行限速来保护我们的系统, 一旦达到我们限制的速度则可以:

  1.拒绝服务(提示友好的信息或者跳转到错误提示页);

  2.排队或等待(比如秒杀);

  3.降级(返回默认数据).

  限流, 归根结底就是在一定频率上进行量的限制.

  一般用来控制服务请求的速率, 比如天猫双十一的限流, 京东618的限流, 12306的抢票等.

  使用场景:

      1.对稀缺资源的秒杀, 抢购;

      2.对数据亏的高并发读写操作, 比如下单, 瞬间往数据库插入大量的数据等;

令牌桶算法

  

实现方式 Guava的RateLimiter类

     Guava框架提供了令牌桶算法实现(Google), 可直接拿来使用, 使用Guava框架的RateLimiter类即可创建一个令牌桶限流器.

pom.xml

    <!--google guava-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>27.1-jre</version>
        </dependency>

令牌桶算法实现

public class TokenBucketLimiter {

    public static void main(String[] args){
        //表示桶容量为5, 且每秒新增5个令牌, 1秒=1000毫秒, 速度: 200毫秒放/个
        RateLimiter limiter = RateLimiter.create(5);

        //返回值表示从令牌桶中获取一个令牌所花的时间, 单位是秒, 第一次获取不需要时间
        System.out.println(limiter.acquire(50));

        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));
        System.out.println(limiter.acquire(1));

    }
}

1.简单实现下单秒杀

OrderController

@RestController //返回json或者字符串
public class OrderController {
    /**
     * 创建独立线程实现每秒放的令牌数
     */
    RateLimiter rateLimiter = RateLimiter.create(2);
    
    @RequestMapping("/boot/order")
    public String order(){
        String result = "";
        //1.限流处理, 客户端请求从桶中获取令牌, 如果在500毫秒内没有获取到令牌的话, 则直接走服务降级出处理
        boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);

        if (!tryAcquire) {
            result = "哎呀, 服务器太挤了, 没有挤进去...";
            logger.info(result);
            return result;
        }

        //2.业务逻辑处理
        int addRows = this.addOrder();
        if (addRows > 0) {
            result = "恭喜您, 秒杀下单成功! ";
            return result;
        }

        result = "Sorry, 秒杀下单失败了, 请再试一次吧...";
        return result;
    }
}

2.用切面加自定义注解辅助RateLimiter类实现下单秒杀

OrderController

@RestController //返回json或者字符串
public class OrderController{

    /**
     * 使用注解方式实现服务限流
     * rate: 速率, timeOut: 超时时间
     * @return
     */
    @DlRateLimiter(rate = 16.0, timeOut = 500)
    @RequestMapping("/boot/order2")
    public String order2(){
        String result = "";
//        //1.限流处理, 客户端请求从桶中获取令牌, 如果在500毫秒内没有获取到令牌的话, 则直接走服务降级出处理
//        boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
//
//        if (!tryAcquire) {
//            result = "哎呀, 服务器太挤了, 没有挤进去...";
//            logger.info(result);
//            return result;
//        }

        //2.业务逻辑处理
        int addRows = this.addOrder();
        if (addRows > 0) {
            result = "恭喜您, 秒杀下单成功! ";
            return result;
        }

        result = "Sorry, 秒杀下单失败了, 请再试一次吧...";
        return result;
    }
}

RateLimiter切面类RateLimiterAspect

@Aspect
@Component
public class RateLimiterAspect {

    @Autowired
    private HttpServletResponse response;
    //创建一个令牌限速器
    private RateLimiter rateLimiter = RateLimiter.create(700);

    @Pointcut("execution(public * cn.abchinalife.aalimit.*.*(..))")
    public void pointcut(){

    }

    @Around("pointcut()")
    public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {

        //方法签名
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();

        //使用java反射技术获取方法上是否有@DlRateLimiter 注解类
        DlRateLimiter dlRateLimiter = signature.getMethod().getDeclaredAnnotation(DlRateLimiter.class);
        if (dlRateLimiter == null) {
            //正常执行方法, 执行业务方法
            return proceedingJoinPoint.proceed();
        }

        //获取注解上的参数, 获取配置的速率
        double rate = dlRateLimiter.rate();
        //获取注解上的参数, 获取令牌等待时间
        long timeOut = dlRateLimiter.timeOut();

        //设置限流速率
        rateLimiter.setRate(rate);

        //判断令牌桶获取token是否超时
        boolean tryAcquire = rateLimiter.tryAcquire(timeOut, TimeUnit.MILLISECONDS);
        if (!tryAcquire) {
            //服务降级
            fallback();
            return null;
        }
        //获取到令牌, 直接执行
        return proceedingJoinPoint.proceed();
    }

    /**
     * 服务降级
     */
    private void fallback(){
        response.setHeader("Content-type", "text/html;charset=UTF-8");
        PrintWriter writer = null;
        try {
            writer = response.getWriter();
            writer.println("哎呀, 服务器太挤了, 没有挤进去...");
            writer.flush();
            writer.close();
        } catch (Exception e) {

        }

    }

}

RateLimiter自定义注解类DlRateLimiter

@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DlRateLimiter {

    //往令牌桶放入令牌的速率
    double rate();

    //获取令牌的超时时间
    long timeOut() default 0;
}

 3.用redis + lua脚本 + 切面 + 自定义注解辅助RateLimiter实现

limit.lua

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then
    return 0
else --请求数+1, 并设置2秒过期
    redis.call("INCRBY", key, "1")
    redis.call("EXPIRE", key, "2")
    return 1
end


RedisLimiterAspect切面类
@Aspect
@Component
public class RedisLimiterAspect {

    @Autowired
    private HttpServletResponse response;
    /**
     * 操作redis的模板
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private DefaultRedisScript<List> redisScript;

    @PostConstruct
    public void init(){
        redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(List.class);
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("classpath:config/limit.lua")));
    }

//    //创建一个令牌限速器
//    private RateLimiter rateLimiter = RateLimiter.create(700);

    @Pointcut("execution(public * cn.abchinalife.aalimit.*.*(..))")
    public void pointcut(){
    }

    @Around("pointcut()")
    public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {

        //方法签名
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();

        //使用java反射技术获取方法上是否有@DlRateLimiter 注解类
        DlRedisLimiter dlRedisLimiter = signature.getMethod().getDeclaredAnnotation(DlRedisLimiter.class);
        if (dlRedisLimiter == null) {
            //正常执行方法, 执行业务方法
            return proceedingJoinPoint.proceed();
        }

        //获取注解上的参数, 获取配置的速率
        double value = dlRedisLimiter.value();

        //List设置lua的KEYS[1]
        //获取当前时间戳到单位秒
        String key = "ip:" + System.currentTimeMillis() / 1000;
        List<String> keyList = Lists.newArrayList(key);

        //调用脚本并执行
        List result = stringRedisTemplate.execute(redisScript, keyList, String.valueOf(value));
        System.out.println("lua脚本执行结果: " + result);

        //lua脚本返回0表示超出流量大小, 返回1表示没有超出流量大小
        if (StringUtils.equals(result.get(0).toString(), "0")) {
            //服务降级
            fallback();
            return null;
        }
        //没有限流, 直接执行
        return proceedingJoinPoint.proceed();
    }

    /**
     * 服务降级
     */
    private void fallback(){
        response.setHeader("Content-type", "text/html;charset=UTF-8");
        PrintWriter writer = null;
        try {
            writer = response.getWriter();
            writer.println("哎呀, 服务器太挤了, 没有挤进去...");
            writer.flush();
            writer.close();
        } catch (Exception e) {

        }

    }

}
DlRedisLimiter自定义注解
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DlRedisLimiter {

    //往令牌桶放入令牌的速率
    @AliasFor("limit")
    double value() default Double.MAX_VALUE;

    //获取令牌的超时时间
    double limit() default Double.MAX_VALUE;
}

OrderController

/**
     * 使用注解方式实现服务限流
     * value 限流数/每秒
     * @return
     */
    @DlRedisLimiter(value=10.0D)
//    @DlRateLimiter(rate = 16.0, timeOut = 500)
    @RequestMapping("/boot/sms2")
    public String sms2(){
        String result = "";
//        //1.限流处理, 客户端请求从桶中获取令牌, 如果在500毫秒内没有获取到令牌的话, 则直接走服务降级出处理
//        boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
//
//        if (!tryAcquire) {
//            result = "哎呀, 服务器太挤了, 没有挤进去...";
//            logger.info(result);
//            return result;
//        }

        //2.业务逻辑处理
        boolean flag = this.sendSMS("恭喜您获得100元优惠卷");
        if (flag) {
            result = "短信发送成功! ";
            return result;
        }

        result = "Sorry, 短信发送失败了, 请再试一次吧...";
        return result;
    }
原文地址:https://www.cnblogs.com/goujh/p/10940026.html