Sentinel Go 核心统计结构滑动窗口的深度解析

Sentinel Go 核心模块分析之滑动窗口统计结构设计
什么是滑动时间窗口
滑动窗口基本运行模式
滑动窗口的周期和格子长度怎么设置?
滑动窗口长度一致,格子长度不一致
格子长度一致,滑动窗口长度不一致
总结
Sentinel Go时间滑动窗口实现
长度可设置的原子数组
基于时间的滑动窗口实现

本文主要分析 Sentinel Go 流量指标统计底层滑动窗口的实现。
在高可用流量防护场景中,比如流控、熔断等,不管基于什么策略,底层最核心的一部分是系统指标(如请求数、rt、异常数、异常比例等等)的统计结构。本文主要分析 Sentinel Go 中统计结构滑动窗口的实现。

什么是滑动时间窗口
应用指标统计很重要一点是要与时间对齐,比如流控可能希望的是拿到前一秒的总请求数,所以在我们统计的指标都是需要与时间对齐。

滑动窗口基本运行模式
滑动时间窗口有两个很重要设置:
(1)滑动窗口的统计周期:表示滑动窗口的统计周期,一个滑动窗口有一个或多个窗口。
(2)滑动窗口中每个窗口长度:每个窗口(也叫格子,后文格子都是指一个窗口)的统计周期。

这里先假设我的滑动时间窗口长度是1000ms,每个窗口统计时间长是200ms,那么就会有5个窗口。假设我一个窗口记录的起始时间是第1000ms,那么一个基本的滑动窗口的示意图如下图:(注意,这里忽略了每个格子里面具体的统计结构)

(1)滑动窗口里面每个格子都是一个统计结构,可以理解成一个抽象的结构(比如Java的Object或则Go的interface{}),用户可以自己决定统计的具体数据结构。
(2)每个格子都会有自己的统计开始时间,在 [开始时间,开始时间+格子长度)这个时间范围内,所有统计都会落到这个格子里面。

怎么计算当前时间在哪一个格子里面呢? 这里假设滑动窗口长度是 interval 表示,每个格子长度是 bucketLength 表示,当前时间是 now,前面的数值都是毫秒单位。那么计算方式就是:

当前时间所在格子计算方式 index = (now/bucketLength)%interval

也就是说我们知道当前时间就能知道当前时间对应在滑动窗口的第几个格子。举一些例子来说明:
(1)假设当前时间是1455ms,那么经过计算,index 就是 2,也就是第三个格子。
(2)假设当前时间是1455000000000 ms,那么经过计算,index 就是 0,也就是第一个格子。

随着时间的滑动,滑动窗口类似于一个固定长度的环形队列。

前面图示的滑动窗口的时间范围是[1000, 2000),假设当前时间滑动到了2001ms时候会是怎么样呢?看下图:


根据前面计算方式,now=2001,算出来index是0,也就是当前时间打到了第一个格子。然后计算当前时间对应的格子的起始时间,很明显就是2000,这个时候发现 2000 > 1000(格子原始统计起始时间),说明当前格子进入了新的统计周期,所以需要把当前格子重置,重置包括两部分:(1)格子起始时间到2000;(2)统计结构清空。

滑动窗口的周期和格子长度怎么设置?
滑动窗口的设置主要是两个参数:
(1)滑动窗口的长度;
(2)滑动窗口每个格子的长度。
那么这两个设置应该怎么设置呢?

这里主要考虑点是:抗脉冲流量的能力和精确度之间的平衡。
(1)如果格子长度设置的小那么统计就会更加精确,但是格子太多,会增加竞争的可能性,因为窗口滑动必须是并发安全的,这里会有竞争。
(2)如果滑动窗口长度越长,对脉冲的平滑能力就会越强。

滑动窗口长度一致,格子长度不一致
这里首先来个对比:

很直观,假设这两种case下的统计数据如下:

在[1000,1500) 区间统计都是600,[1500, 2000) 之间统计都是500。我们获取滑动窗口的统计时候,两者的统计总和都是1100。

但是,当前时间如果滑动到了2001ms的时候,按照前面滑动窗口的逻辑,我们需要将滑动窗口的第一个格子覆盖成新的统计格子。如下图:

从上图可以看出来,到覆盖第一个格子时候,两个滑动窗口的统计结果就完全不一样了:
(1)第一个滑动窗口第一个格子(500ms长度)清零了,整个统计总计数变成了 501;
(2)第二个滑动窗口第一个格子(100ms长度)清零了,整个统计总计数变成了 981;
但是随着时间的继续往后滑动,在[2000, 2500) 之间,时间越往后,两者之间精度差会越来越小。

很明显结论:在滑动窗口统计周期一样情况下,格子划分的越多,那么统计的精度就越高。

格子长度一致,滑动窗口长度不一致
滑动窗口整体的统计长度怎么设置呢?有哪些需要考虑的点呢?这里一个非常重要的因素是对流量脉冲的抵抗能力。

什么是流量脉冲?什么是非脉冲流量?

(1)下图就是一个典型的类似脉冲流量,窗口统计长度是2000ms,格子长度是200ms。在[1400ms,2200ms)之间来了一波峰值。


(2)下图类似于非脉冲流量,窗口统计长度是2000ms,格子长度是200ms。每个格子内请求数都在160左右。

我们统计滑动窗口内的请求数总数时候,也就是把所有格子的统计值求和。
(1)对于上图(1)中脉冲流量,在整个滑动窗口的总和是1600个请求数左右。平均下来每200ms大概是160个请求,但是会在1600ms这个格子来一个峰值,达到了640个请求。 这里明显是一个很大的脉冲。
(2)对于上图(2)中非脉冲流量,在整个滑动窗口的总和是1600个请求数左右。而且以200ms粒度来看,整体流量非常平滑。

这里带来问题是:从整个滑动窗口统计周期(2000ms)来看,两者最后的效果是一样的。因为统计周期长,导致脉冲流量被平均掉了(也可以理解成被平滑了)。

那么假设我的滑动窗口统计周期只有400ms呢?对于[1400ms, 1600ms)区间,如下图:

窗口长度是400ms,两个格子,每个格子200ms。 整个窗口统计总请求数:650。 按照前面2000ms窗口来平均,400ms窗口应该是400个请求数。明显650 > 400。 所以窗口长度越小,抗脉冲能力越差。

结论就是:滑动窗口的长度设置的越长,整体统计的结果抗脉冲能力会越强;滑动窗口的长度设置的越短,整体统计结果对脉冲的抵抗能力越弱。

具体怎么平衡还是要依据系统对脉冲流量的处理能力。

总结
(1)滑动窗口长度设置主要影响对脉冲流量的平滑效果,窗口越长,抗脉冲能力越强;
(2)滑动窗口的格子数量(固定滑动窗口长度下的格子长度),主要影响统计的精度,格子数越多,精度越高,但是也不是越多越好,太多了会影响并发性能。

Sentinel Go时间滑动窗口实现
Sentinel Go 滑动窗口 的实现基本和前一个章节描述的一样。

整个实现分为两部分:
(1)使用可设置的定长数组来表示滑动窗口(无锁),数组每个元素就是一个格子,格子是一个抽象对象,可以存储任意统计实体。
(2)一套基于时间滑动的算法,保证滑动窗口的滑动符合预期。

长度可设置的原子数组
滑动窗口最基础部分是一个无锁的原子数组。 Sentinel Go 是基于 slice 来实现原子数组的。原子数组可以无锁实现并发下的读写。

先看定义:

// AtomicBucketWrapArray forbit appending or shrinking after initializing
type AtomicBucketWrapArray struct {
// The base address for real data array
base unsafe.Pointer
// The length of slice(array), it can not be modified.
length int
data []*BucketWrap
}

// BucketWrap represent a slot to record metrics
type BucketWrap struct {
// The start timestamp of this statistic bucket wrapper.
BucketStart uint64
// The actual data structure to record the metrics (e.g. MetricBucket).
Value atomic.Value
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
原子数组AtomicBucketWrapArray里面通过一个切片 []*BucketWrap 来表示一个滑动窗口,窗口长度通过 length 字段表示,base 字段表示的是切片底层实际存储数据的第一个元素的首地址。这里有一点需要再次强调的:原子数组AtomicBucketWrapArray创建时候必须制定长度,一旦创建了之后不允许再增加元素或则缩容。

BucketWrap 表示滑动窗口中每个格子实际的存储实体,里面包含两个字段:BucketStart 表示当前格子统计的开始时间;Value是一个原子变量,可以简单理解成一个并发安全的 void* 指针。

那么我们是怎么保证AtomicBucketWrapArray是一个并发安全的呢?在创建 AtomicBucketWrapArray 对象的源码可以找到答案:

// SliceHeader is a safe version of SliceHeader used within this project.
type SliceHeader struct {
Data unsafe.Pointer
Len int
Cap int
}

func NewAtomicBucketWrapArrayWithTime(len int, bucketLengthInMs uint32, now uint64, generator BucketGenerator) *AtomicBucketWrapArray {
// Step1: new
ret := &AtomicBucketWrapArray{
length: len,
data: make([]*BucketWrap, len),
}
// Step2: ......
// initilize sliding windows start time (BucketWrap)

// Step3: calculate base address for real data array
sliHeader := (*SliceHeader)(unsafe.Pointer(&ret.data))
ret.base = unsafe.Pointer((**BucketWrap)(sliHeader.Data))
return ret
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
下面一步步解释 NewAtomicBucketWrapArrayWithTime 函数是怎么做的:
(1)根据输入的原子数组长度来创建AtomicBucketWrapArray实例,底层slice的时候必须要指定长度,即data: make([]*BucketWrap, len)。 这里会新建一个长度为 len 的slice,底层提前分配了len个元素的数组,并且初始化为nil指针。
(2)上面贴出代码省略部分为预初始化滑动窗口中每个格子的元素,也就是提前初始化 *BucketWrap,这块在后面再聊。
(3)根据 slice 的内存布局,拿到slice底层实际存储数组结构的首元素的地址,然后更新到AtomicBucketWrapArray.base字段。这里基于slice内存布局以及unsafe的指针转换(二级指针**BuckerWarp),可以拿到底层实际存储数组的首元素的内存地址。(Note:因为数组存储元素的是*BucketWarp,所以数组首地址实际上一个二级指针**BuckerWarp)

这里基于 slice 的内存转换拿到底层首元素内存地址的指针转换原理,可以参考另一篇文章 Go unsafe.Pointer 使用基本原则

AtomicBucketWrapArray 的创建过程为了避免一些边界逻辑,这里采用了创建对象时就预分配的所有格子的其实时间的逻辑,下面的代码片段实际上也就是NewAtomicBucketWrapArrayWithTime 函数中省略的Step2:

timeId := now / uint64(bucketLengthInMs)
idx := int(timeId) % len
startTime := calculateStartTime(now, bucketLengthInMs)

for i := idx; i <= len-1; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
(1)这里首先会根据创建AtomicBucketWrapArray 时候的当前时间now计算出其所对在的格子在数组中的下标idx以及该格子的统计其实时间startTime;
(2)从[idx, len-1] 会预先分配每个格子结构*BucketWrap,并且基于计算出的startTime依次往后填入对应格子的开始时间;
(3)从[0,idx-1] 这个区间也会预先分配每个格子结构*BucketWrap,不过需要注意的是,[0,idx-1] 里面的格子预先分配的时间也是未来的时间。

这里采用预分配主要是为了在滑动窗口在初始化之后,降低时间滑动时候逻辑的复杂性(避免判空然后新建*BucketWrap的流程)。下面以一个图示来说明:假设原子数组长度是1000ms,每个格子长度是200ms,当前时间是1401ms。首先1401ms计算出对应的格子索引是 2,格子起始时间是1400ms;那么新建AtomicBucketWrapArray 时候创建的对象实际存储如下图:

前面解释了新建原子数组AtomicBucketWrapArray的原理,那么我们拿到一个时间时候,怎么能够在并发环境下访问呢?这里避免不了的指针的原子操作,还是从源码出发:

func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
if idx >= aa.length || idx < 0 {
logging.Error(errors.New("array index out of bounds"),
"array index out of bounds in AtomicBucketWrapArray.elementOffset()",
"idx", idx, "arrayLength", aa.length)
return nil, false
}
basePtr := aa.base
return unsafe.Pointer(uintptr(basePtr) + uintptr(idx*PtrSize)), true
}

func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
if offset, ok := aa.elementOffset(idx); ok {
return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
}
return nil
}

func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
// update secondary pointer
if offset, ok := aa.elementOffset(idx); ok {
return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
}
return false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
基于滑动窗口的背景,我们期望输入是当前时间 now,基于这个输入我们可以在并发环境下拿到对应的格子。通过时间对齐的计算,我们根据now可以计算出当前时间在滑动窗口中对应的格子的下标索引idx。那么问题也就是转换成基于下标idx并发安全的拿到对应格子。

(1)根据idx拿到数组中对应元素的首地址:elementOffset 函数:
前面我们知道AtomicBucketWrapArray.base已经保存了底层存储数组的首元素的存储地址。那么只需要通过指针的运算就可以拿到第idx个元素的存储地址,计算逻辑:unsafe.Pointer(uintptr(basePtr) + uintptr(idx*PtrSize))。

(2)get(idx) 函数拿到第idx个元素实际存储的对象:
这里首先通过elementOffset(idx)拿到第idx个元素的地址,然后通过指针的原子操作atomic.LoadPointer 函数可以原子的拿到第idx个元素实际存储的*BucketWrap对象。核心逻辑简单理解成:(*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(elementOffset(idx))))

(3)compareAndSet(idx int, except, update *BucketWrap) bool 原子更新的逻辑
compareAndSet的逻辑和get函数基本一致,这里就不再细说。

理解上面三个函数的逻辑有一个背景就是要理解好二级指针。原子数组每个元素实际存储的是一个指针*BucketWrap,下面用一个图示:

base表示第一个元素的存储地址,那么*base 表示的是第一个元素。假设是64位机器,指针类型占用8个字节,那么*(base+8)就表示第二个元素。

基于时间的滑动窗口实现
前面的AtomicBucketWrapArray 已经提供了原子数组的能力,在AtomicBucketWrapArray 之上就是滑动窗口的能力。先粘贴出源码:

type LeapArray struct {
bucketLengthInMs uint32
sampleCount uint32
intervalInMs uint32
array *AtomicBucketWrapArray
// update lock
updateLock mutex
}

// Generic interface to generate bucket
type BucketGenerator interface {
// called when timestamp entry a new slot interval
NewEmptyBucket() interface{}

// reset the BucketWrap, clear all data of BucketWrap
ResetBucketTo(bw *BucketWrap, startTime uint64) *BucketWrap
}

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
if now <= 0 {
return nil, errors.New("Current time is less than 0.")
}

idx := la.calculateTimeIdx(now)
bucketStart := calculateStartTime(now, la.bucketLengthInMs)

for { //spin to get the current BucketWrap
old := la.array.get(idx)
if old == nil {
// because la.array.data had initiated when new la.array
// theoretically, here is not reachable
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
// reset BucketWrap
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
if la.sampleCount == 1 {
// if sampleCount==1 in leap array, in concurrency scenario, this case is possible
return old, nil
}
// TODO: reserve for some special case (e.g. when occupying "future" buckets).
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
底层存储是基于AtomicBucketWrapArray,这里就不细说了。

BucketGenerator是一个抽象接口,该接口能力基于两个函数:NewEmptyBucket是用于创建格子里面的实际统计的数据结构;ResetBucketTo 函数是用于时间滑动时,将某个格子的起始时间以及统计结构重置。BucketGenerator接口是保证滑动窗口里面的格子可以复用于统计任何存储结构的关键。

下面重点在于无锁的滑动窗口实际运行流程,也就是currentBucketOfTime函数是怎么运行的:

首先会根据当前时间now计算出其在滑动窗口中是哪个格子,也就是格子在数组中的下标,以及所对应的统计起始时间bucketStart;
进入一个循环获取对应格子统计结构*BucketWrap:
基于 AtomicBucketWrapArray 能力,原子获取格子当前存储的*BucketWrap,也就是代码中的old指针对象;
如果old是空,也就是表示当前格子还没有初始化,新建*BucketWrap,然后通过CAS设置;并发情况下,如果有协程设置失败,就会重新进入for循环;
如果old非空而且old格子的统计开始时间old.BucketStart和 now计算出来的统计起始时间一致,也就是说明当前时间now对应的格子有效,直接返回统计的格子。
如果old非空而且old格子的统计开始时间old.BucketStart小于 now计算出来的统计起始时间;这种case也就是说明当前时间now已经走到了下一轮或则下多轮了,这个时候需要更新当前格子的统计结构了(更新格子的统计开始时间以及统计结构清零);需要注意的是,在并发下,可能有多个协程同时走到了这里,所以这里通过一个锁的TryLock操作保证只有一个协程会执行格子更新操作。这里获取锁失败协程会再次走到for循环。
如果old非空而且old格子的统计开始时间old.BucketStart大于 now计算出来的统计起始时间;也就是说当前时间已经落后了,这是一种别的协程已经预占了未来格子的case,目前逻辑没有用到。
下图展示滑动窗口随着时间滑动过程中变化:


参考文档:
Sentinel Go源码
golang slice实践以及底层实现
golang unsafe.Pointer使用原则以及 uintptr 隐藏的坑
————————————————
版权声明:本文为CSDN博主「惜暮」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010853261/article/details/108787189

原文地址:https://www.cnblogs.com/ExMan/p/14684080.html