【springcloud】2.eureka源码分析之令牌桶-限流算法

国际惯例原理图

代码实现

package Thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ProjectName: cutter-point
 * @Package: Thread
 * @ClassName: RateLimiter
 * @Author: xiaof
 * @Description: 令牌桶,限流
 * @Date: 2019/6/21 11:41
 * @Version: 1.0
 */
public class RateLimiter {

    //限流消费的令牌
    private final AtomicInteger consumedTokens = new AtomicInteger();

    private final AtomicLong lastRefrushTokenTime = new AtomicLong(0);

    //限流类型,是秒,还是分
    private final long rateType;

    public RateLimiter(TimeUnit averageRateUnit) {
        switch (averageRateUnit) {
            case SECONDS:
                rateType = 1000;
                break;
            case MINUTES:
                rateType = 60 * 1000;
                break;
            default:
                throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
        }
    }

    //请求令牌,判断是否可以获取到新的令牌
    public boolean acquire(int bucketSize, long avgRate) {
        return acquire(bucketSize, avgRate, System.currentTimeMillis());
    }

    public boolean acquire(int bucketSize, long avgRate, long curentTimeMillis) {

        if(bucketSize <= 0 || avgRate <= 0) {
            return true;//如果这2个参数,任意一个为0 ,我们就认为没有上限
        }

        //刷新令牌桶
        refillToken(bucketSize, avgRate, curentTimeMillis);
        //开始消费令牌
        return consumToken(bucketSize);
    }

    private void refillToken(int bucketSize, long avgRate, long currentTimeMillis) {
        //获取上次最后以后更新令牌时间
        long freshTime = lastRefrushTokenTime.get();
        //获取当前间隔时间
        long timeDelta = currentTimeMillis - freshTime;

        //计算这次需要填充的token数
        long newToken = timeDelta * avgRate /rateType;
        if(newToken > 0) {
            //新的更新时间
            long newFillTime = freshTime == 0 ? currentTimeMillis : freshTime + timeDelta;
            //用cas操作,以保证只有一个线程注入新令牌
            if(lastRefrushTokenTime.compareAndSet(freshTime, newFillTime)) {
                //死循环,直到设置成功新的令牌
                while(true) {
                    //1.获取当前消费的令牌
                    int currentConsumToken = consumedTokens.get();
                    //2.获取消费的令牌容量,跟桶极限大小比较,取小的那个
                    int realConsumTokens = Math.min(currentConsumToken, bucketSize);
                    //3.计算填充之后剩余的被消费的容量,计算新增容量,用来填充被消费的令牌数
                    //剩余的消费容量,但是不能比0还小,这个要取值
                    int newConsumSize = (int) Math.max(0, realConsumTokens - newToken);
                    //然后设置进去
                    if(consumedTokens.compareAndSet(currentConsumToken, newConsumSize)) {
                        return;
                    }
                }
            }
        }
    }

    //消费令牌
    private boolean consumToken(int bucketSize) {
        while (true) {
            int currentLevel = consumedTokens.get();
            //如果超出负载
            if (currentLevel >= bucketSize) {
                return false;
            }
            //每次消费一个
            if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
                return true;
            }
        }
    }

    public void reset() {
        consumedTokens.set(0);
        lastRefrushTokenTime.set(0);
    }

}

到这里可能有的人不清楚怎么用,来我们测试一波

我们假设有100个用户同时请求,然后令牌恢复速率调成10,然后速率单位改为秒,也就是1秒恢复10个令牌

这样同时100个请求过来,马上令牌就会被用完,那么就会被限流,比如我们拦截器这个时候可以返回404,或者503

public static void main(String args[]) {

        RateLimiter rateLimiter = new RateLimiter(TimeUnit.SECONDS);

        final int bucketSize = 10;
        //回复令牌生产速率
        final long avgRate = 10;

        //判断是否流量达到上限
        ExecutorService pool = Executors.newCachedThreadPool();
        for(int i = 0; i < 100; ++i) {
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    while(true) {
                        try {
                            Thread.sleep(((int) (Math.random() * 10)) * 1000) ;

                            if(!rateLimiter.acquire(bucketSize, avgRate)) {
                                System.err.println(Thread.currentThread().getName() + "已经限流成功----response.setStatus(404)");
                            } else {
                                System.out.println(Thread.currentThread().getName() + "正常执行");
                            }

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        while(true) {

        }

    }

效果展示

原文地址:https://www.cnblogs.com/cutter-point/p/11064362.html