自己实现多限流器限流算法

漏斗限流

缺点:通过流量恒定 无法应对流量波动 如qps1000 , 800,1200 , 800 限流结果 800,1000, 800

令牌限流

每秒生产1000令牌 将上一秒剩余 令牌解决了流量波动 .需要限制令牌最大数 qps1000 800,1200,8000 结果 800,1200,800


import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 2021/7/2 1:10 下午
 */
public class Limiter {
    private static Cache<String, RateLimiter> cache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterAccess(1, TimeUnit.MINUTES).build();

    private static int default_limit = 5;
    
    static {
        default_limit = Integer.parseInt(System.getProperty("limit","5"));
    }

    public static boolean isLimit(String key) {
        RateLimiter rateLimiter = null;
        try {
            rateLimiter = cache.get(key,()->{
                return RateLimiter.create(default_limit);
            });
        } catch (ExecutionException e) {
            //log
        }
        if (rateLimiter.tryAcquire(1000,TimeUnit.MILLISECONDS)) {
            return false;
        }
        return true;
    }
}

真实使用限流

/**
 * @author: 谢洪伟
 * 2021/7/2 3:16 下午
 */
public class HttpSimpleClient {

    static public void httpGet(String url, List<String> headers, List<String> paramValues,
                                                                                     String encoding, long readTimeoutMs, boolean isSSL) throws IOException {
        String encodedContent = paramValues.toString();
        url += (null == encodedContent) ? "" : ("?" + encodedContent);
        if (Limiter.isLimit(MD5.getInstance().getMD5String(
                new StringBuilder(url).append(encodedContent).toString()))) {
            // error
            throw new RuntimeException();
        }
        HttpURLConnection conn = null;
        //http 连接
    }
}

集群调用

/**
 * 按照优先级顺序给server发请求,直到某个server请求成功。
 *
 * 粘性,如果请求某个server成功,那么下个请求也会首先发给该server。
 */
public class ServerHttpAgent {
    final ServerListManager serverListMgr;
    volatile String currentServerIp;

    public HttpSimpleClient.HttpResult httpGet(String path, List<String> headers, List<String> paramValues,
                                               String encoding, long readTimeoutMs) throws IOException {
        // 1 先用粘性的当前ip尝试
        HttpSimpleClient.HttpResult result = HttpSimpleClient.httpGet(
                getUrl(currentServerIp, port, path),
                headers, paramValues, encoding, readTimeoutMs);

        // 失败后从服务列表拿ip
        for (Iterator<String> serverIter = serverListMgr.iterator(); serverIter.hasNext();) {
            HttpSimpleClient.HttpResult result = HttpSimpleClient.httpGet(
                    getUrl(currentServerIp, port, path),
                    headers, paramValues, encoding, readTimeoutMs);
        }
        return null;
    }
}
原文地址:https://www.cnblogs.com/albertXe/p/14962873.html