四. Go微服务--漏桶算法实现限流

1.序

除开前面章节讲到的令牌桶算法实现的网络限流外, 还有另外一种常见的限流算法, 漏桶算法

2. 漏桶算法

漏桶算法(Leaky Bucket) 是网络世界中 流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。

Bursty Flow

在上图中,水龙头代表着突发流量(Bursty Flow)。当网络中存在突发流量,且无任何调控时,就会出现像 Bursty Data 处类似的场景。主机以 12 Mbps 的速率发送数据,时间持续 2s,总计 24 Mbits 数据。随后主机暂停发送 5s,然后再以 2 Mbps 的速率发送数据 3s,最终总共发送了 6 Mbits 的数据。

因此主机在 10s 内总共发送了 30 Mbits 的数据。但这里存在一个问题,就是数据的发送并不是平滑的,存在一个较大的波峰。若所有流量都是如此的传输方式,将会 “旱的旱死涝的涝死”,对系统并不是特别的友好

Fixed Flow

为了解决 Bursty Flow 场景的问题。漏桶(Leaky Bucket)出现了,漏桶具有固定的流出速率、固定的容量大小。

在上图中,漏桶在相同的 10s 内以 3 Mbps 的速率持续发送数据来平滑流量。若水(流量)来的过猛,但水流(漏水)不够快时,其最终结果就是导致水直接溢出,呈现出来就是拒绝请求/排队等待的表现。另外当 Buckets 空时,是会出现一次性倒入达到 Bucket 容量限制的水的可能性,此时也可能会出现波峰。

简单来讲就是,一个漏桶,水流进来,但漏桶只有固定的流速来流出水,若容量满即拒绝,否则将持续保持流量流出。

漏桶算法的主要作用就是避免出现有的时候流量很高

Go 中比较常用的漏桶算法的实现就是来自 uber 的 ratelimit,下面我们就会看一下这个库的使用方式和源码

2.1 API

type Clock  // interface

type Limiter
    func New(rate int, opts ...Option) Limiter
    func NewUnlimited() Limiter
    
type Option
    func Per(per time.Duration) Option
    func WithClock(clock Clock) Option
    func WithSlack(slack int) Option
  1. Clock 是一个接口,计时器的最小实现,有两个方法,分别是当前的时间和睡眠

    type Clock interface {
    	Now() time.Time
    	Sleep(time.Duration)
    }
    
  2. Limiter 也是一个接口, 只有一个Take方法, 执行这个方法的时候如果触发了 rps 限制则会阻塞住

    type Limiter interface {
    // Take should block to make sure that the RPS is met.
        Take() time.Time
    }
    
  3. NewLimterNewUnlimited 会分别初始化一个无锁的限速器和没有任何限制的限速器

  4. Option 是在初始化的时候的额外参数. Option有三个方法

    • Per 可以修改时间单位, 默认是秒所以我们默认限制的是rps, 如果改成分钟那么就是rpm了
    • WithClock 可以修改时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间
    • WithSlack 用于修改松弛时间,也就是可以允许的突发流量的大小,默认是 Pre / 10 ,这个后面会讲到

2.3 基于漏桶算法实现IP限流中间件

  1. demo.go

    package main
    
    import (
    	"fmt"
    	"net/http"
    	"sync"
    
    	"github.com/gin-gonic/gin"
    	"go.uber.org/ratelimit"
    )
    
    // Gin中间件
    func NewLimiter(rps int) gin.HandlerFunc {
    	limiters := sync.Map{}
    
    	return func(c *gin.Context) {
    		// 获取限速器
    		// key 除了 ip 之外也可以是其他的,例如 header,user name 等
    		key := c.ClientIP()
    		l, _ := limiters.LoadOrStore(key, ratelimit.New(rps))
    		now := l.(ratelimit.Limiter).Take()
    		fmt.Printf("now: %s
    ", now)
    		c.Next()
    	}
    }
    
    func main() {
    	e := gin.Default()
    
    	// 新建一个限速器,允许突发 3 个并发
    	e.Use(NewLimiter(3))
    	e.GET("ping", func(c *gin.Context) {
    		c.String(http.StatusOK, "pong")
    	})
    
    	err := e.Run(":8080")
    	if err != nil {
    		fmt.Printf("Start server err, %s", err.Error())
    	}
    }
    
  2. 测试, 使用go-stress-testing进行压测,并发100
    go-stress-testing -c 100 -u http://172.20.192.1:8080/ping

  3. 测试结果

    root@failymao:~# go-stress-testing -c 20 -u http://172.20.192.1:8080/ping
    
     开始启动  并发数:20 请求数:1 请求参数:
    request:
     form:http
     url:http://172.20.192.1:8080/ping
     method:GET
     headers:map[]
     data:
     verify:statusCode
     timeout:30s
     debug:false
    
    ─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
     耗时│ 并发数│ 成功数│ 失败数│   qps  │最长耗时│最短耗时│平均耗时│下载字节│字节每秒│ 错误码
    ─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
       1s│     13│     13│      0│  233.84│  676.26│    5.00│   85.53│      52│      51│200:13
       2s│     16│     16│      0│   62.13│ 1676.71│    5.00│  321.92│      64│      31│200:16
       3s│     19│     19│      0│   31.17│ 2676.45│    5.00│  641.63│      76│      25│200:19
       3s│     20│     20│      0│   26.28│ 3027.85│    5.00│  760.94│      80│      26│200:20
    
    
    *************************  结果 stat  ****************************
    处理协程数量: 20
    请求总数(并发数*请求数 -c * -n): 20 总请求时间: 3.032 秒 successNum: 20 failureNum: 0
    *************************  结果 end   ****************************
    

    查看结果发现为什么第一秒的时候完成了 13 个请求,不是限制的 3rps 么?不要慌,我们看看它的实现就知道了

2.4 实现

这个库有基于互斥锁的实现和基于 CAS 的无锁实现,默认使用的是无锁实现版本,所以我们主要看无锁实现的源码

type state struct {
	last     time.Time
	sleepFor time.Duration
}

type atomicLimiter struct {
	state unsafe.Pointer
	//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
	// of this rate limiter in case of collocation with other frequently accessed memory.
	padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.

	perRequest time.Duration
	maxSlack   time.Duration
	clock      Clock
}

atomicLimiter 结构体

  • state 是一个状态的指针,用于存储上一次的执行的时间,以及需要 sleep 的时间
  • padding 是一个无意义的填充数据,为了提高性能,避免 cpu 缓存的 false sharing
    • Go 并发内存模型 为了能够最大限度的利用 CPU 的能力,会做很多丧心病狂的优化,其中一种就是 cpu cache
    • cpu cache 一般是以 cache line 为单位的,在 64 位的机器上一般是 64 字节
    • 所以如果我们高频并发访问的数据小于 64 字节的时候就可能会和其他数据一起缓存,其他数据如果出现改变就会导致 cpu 认为缓存失效,这就是 false sharing
    • 所以在这里为了尽可能提高性能,填充了 56 字节的无意义数据,因为 state 是一个指针占用了 8 个字节,所以 64 - 8 = 56
  • 剩下三个字段和 Option 中的三个方法意义对应
    • perRequest 就是单位,默认是秒
    • maxSlack 松弛时间,也就是可以允许的突发流量的大小,默认是 Pre / 10 ,这个后面会讲到
    • clock 时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间

Take 方法

func (t *atomicLimiter) Take() time.Time {
	var (
        // 状态
		newState state
        // 用于表示原子操作是否成功
		taken    bool
        // 需要 sleep 的时间
		interval time.Duration
	)

    // 如果 CAS 操作不成功就一直尝试
	for !taken {
        // 获取当前的时间
		now := t.clock.Now()

        // load 出上一次调用的时间
		previousStatePointer := atomic.LoadPointer(&t.state)
		oldState := (*state)(previousStatePointer)

		newState = state{
			last:     now,
			sleepFor: oldState.sleepFor,
		}

		// 如果 last 是零值的话,表示之前就没用过,直接保存返回即可
		if oldState.last.IsZero() {
			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
			continue
		}

		// sleepFor 是需要睡眠的时间,由于引入了松弛时间,所以 sleepFor 可能是一个
        // maxSlack ~ 0 之间的一个值,所以这里需要将现在的需要 sleep 的时间和上一次
        // sleepFor 的值相加
		newState.sleepFor += t.perRequest - now.Sub(oldState.last)

        // 如果距离上一次调用已经很久了,sleepFor 可能会是一个很小的值
        // 最小值只能是 maxSlack 的大小
		if newState.sleepFor < t.maxSlack {
			newState.sleepFor = t.maxSlack
		}

        // 如果 sleepFor 大于 0  的话,计算出需要 sleep 的时间
        // 然后将 state.sleepFor 置零
		if newState.sleepFor > 0 {
			newState.last = newState.last.Add(newState.sleepFor)
			interval, newState.sleepFor = newState.sleepFor, 0
		}

        // 保存状态
		taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
	}

    // sleep interval
	t.clock.Sleep(interval)
	return newState.last
}

3. 总结

漏桶和令牌桶的最大的区别就是,令牌桶是支持突发流量的,但是漏桶是不支持的。但是 uber 的这个库通过引入弹性时间的方式也让漏桶算法有了类似令牌桶能够应对部分突发流量的能力,并且实现上还非常的简单,值得学习。

3.1 漏桶 vs 令牌桶

漏桶算法和令牌桶 算法本质上都是为了做流量整形(Traffic Shaping)或速率限制(Rate Limiting),避免系统因为大流量而被打崩,但两者核心差异在于限流的方向是相反的。

令牌桶限制的是流量的平均流入速率,并且允许一定程度的突然性流量,最大速率为桶的容量和生成 token 的速率。而漏桶限制的是流量的流出速率,是相对固定的。

因此也会相对的带来一个问题,在某些场景中,漏桶算法并不能有效的使用网络资源,因为漏桶的漏出速率是相对固定的,所以在网络情况比较好,没有拥塞的状态下,漏桶依然是限制住的,并没有办法放开量。而令牌桶算法则不同,其能够限制平均速率的同时支持一定程度的突发流量。

4. 参考

  1. https://lailin.xyz/post/go-training-week6-4-leaky-bucket.html
  2. https://pkg.go.dev/go.uber.org/ratelimit#section-readme
  3. https://eddycjy.com/posts/microservice/leaky-token-buckets/#漏桶-vs-令牌桶
  4. 利用CPU cache特性优化Go程序
♥永远年轻,永远热泪盈眶♥
原文地址:https://www.cnblogs.com/failymao/p/15228406.html