redis实现的简单令牌桶

这里给出的令牌桶是以redis单节点或者集群为中间件. 不过, 这里的实现比较简单, 主要提供两个函数, 一个用于消费令牌, 一个用于添加令牌. 这里, 消费令牌和添加令牌都是通过lua来保证原子性.

消费令牌的代码如下 :

// FetchToken 用来获取某个key的一个令牌
func (acc *Accessor) FetchToken(key string) (bool, error) {
	/*
	 * KEYS[1] 表示特定的key, 这个key是当前的令牌数
	 */
	keyFetchScript :=
		`--[[测试显示, 通过call, 可以将error返回给客户端, 即使没有使用return]]--
		local curNum = redis.call("DECR", KEYS[1])
		if (curNum >= 0)
		then
			return true
		end
		redis.call("INCR", KEYS[1])
		return false
		`

	keyFetchCmd := redis.NewScript(keyFetchScript)
	res, err := keyFetchCmd.Run(acc.client, []string{key}).Result()
	if err != nil && err != redis.Nil {
		return false, err
	}

	if res == redis.Nil {
		return false, nil
	}

	if val, ok := res.(int64); ok {
		return (val == 1), nil
	}

	return false, errors.New("res should be bool")
}

 这里每一个key都有一个辅助的key_idx, 每次增加key的令牌数, 都会使key_idx的值加1, 同时这个函数调用会返回对应的key_idx的值. 如果传入的idx的值与key_idx值不相同, 则不会执行增加令牌数的操作. 这样设计的目的是, 如果你在不同机器中启动多个增加令牌数的程序, 而且这些程序启动时间不同, 那么其中一个程序将会起到增加令牌数的效果, 而另外的程序不会新增令牌数. 当增加令牌数的这个程序意外关闭, 将会有新的增加令牌的程序起作用. 这个实现的思想类似于乐观锁.具体代码如下:

// AddToken 用来添加某个key的令牌
func (acc *Accessor) AddToken(key string, keyIdx int, keyAdd int, keyLimit int) (int, error) {
	/* KEYS[1] 表示特定key,这个key是当前的令牌
	 * KEYS[2] 表示特定key的idx
	 * ARGV[1] 表示修改的key的增加的值
	 * ARGV[2] 表示修改的key的最大值
	 * ARGV[3] 表示修改的key的idx的序号
	 */
	// 实现思路, 先判断这个key当前的序号与修改调用的序号是否一致,如果一致, 则进行修改,否则返回当前的序号
	keyAddScript :=
		`--[[测试显示, 通过call, 可以将error返回给客户端, 即使没有使用return]]--
		local curIdx = redis.call("INCR", KEYS[2])
		if (curIdx ~= (ARGV[3]+1))
		then
			curIdx = redis.call("DECR", KEYS[2])
			return curIdx
		end
		local curNum = redis.call("INCRBY", KEYS[1], ARGV[1])
		local maxNum = tonumber(ARGV[2])
		if (curNum > maxNum) 
		then 
			redis.call("SET", KEYS[1], ARGV[2])
		end
		return curIdx
		`
	keyAddCmd := redis.NewScript(keyAddScript)
	res, err := keyAddCmd.Run(acc.client, []string{key, getKeyIdx(key)},
		keyAdd, keyLimit, keyIdx).Result()
	if err != nil && err != redis.Nil {
		return 0, err
	}

	if idx, ok := res.(int64); ok {
		return int(idx), nil
	}

	return 0, errors.New("res should be integer")
}
 

假设现在有多个节点,例如有20个节点,我希望每次都有3个节点作为添加令牌桶的节点,那么这个怎么实现呢?

/*
 * 作用: 判断这个节点是否用于新增键令牌(以下称为: 加令牌节点),
 * 从而实现每个redis(或者redis集群)总是有N个节点(例如2个或者3个)用于添加令牌操作
 * 我们可能采用多种方式获取所有的key, 然后向对应的key增加令牌, 例如
 * (1) 通过数据库获取所有键值
 * (2) 通过遍历redis获取所有键值
 * (3) 直接读取配置文件
 * (4) 通过远程调用设置
 * 判断方式通过如下实现:
 * (1) 实现这个判断需要在redis中使用一个键值存储信息, 这里使用"{}"的键值, 这个键存储所有的加令牌节点
 *      1) 这个键使用类型为list
 *      2) 这个list中存储的值为是否可用标示+":"+节点标识符(可用时, 为"1:节点标识符")
 * (2) 程序启动时, 监听这个键的修改操作
 *      1) "加令牌节点"监听对这个键的包括LREM,LSET和PUSH类型的消息
 *      2) 非"加令牌节点"监听对这个键的包括LREM类型的消息
 * (3) 然后, 读取redis中键名为"{}"的键的值, 然后判断当前"加令牌节点"的个数, 如果这个个数小于配置值,
 *  则修改redis中这个键的值,将自己设为"加令牌节点", 否则, 不做处理
 * (4) 对于每个设置为"加令牌节点"的应用, 会在这个list中排在后面的M个节点(例如2个或者3个)建立tcp连接,
 *  然后通过ping和pong消息, 来判断这个节点是否可以连接.
 * (5) 如果A节点发现B节点不可连接(假设每秒发送一条消息,经过20次没有发送成功), 向redis发送一条修改请求,
 *   请求中先查看B节点是否可用,如果B节点当前显示可用, 那么修改"{}"的值,设置1+":"+节点标识符为
 *   0+":"+节点标识符(B节点), 如果显示B节点已经不可用, 则继续进行tcp通信, (如果B节点已经不存在,
 *   则断开与这个节点的tcp连接,按照当前逻辑,这个应该不会出现)
 * (6) 节点继续等待若干时间(例如15s), 在期间查看这个节点是否已经被重置正常,如果恢复正常,则继续进行tcp通信,
* 查看是否存在问题,如果没有恢复正常,从这个list中清除这个节点 * (7) 那些非"加令牌节点"接收到清楚的消息之后, 会申请自己成为"加令牌节点",会检测当前"加令牌节点"的个数, * 如果条件满足, 则将这个节点信息插入, 让这个节点成为"加令牌节点", 然后, 每个当前"加令牌节点" * 读取所有的"加令牌节点", 重新更新与哪些节点建立tcp连接.
* (8) 当前标识符为Ip:port
*/

 完整的代码请参考如下地址:

 https://github.com/ss-torres/ratelimiter.git

在这个git地址中, 如果想调用db_test.go中的测试, 可以参考如下命令:

go test ratelimiter/db -args "localhost:6379" "" "hello"

当前这个实现,没有充分利用redis的提供的功能,按照我的观点来看,使用redis的subscribe和publish的实现可能更加简单,之后可能会提供使用subscribe和publish的实现,然后再进行必要的详细测试和benchmark。另外,使用zookeeper作为中间的协调中间件的实现可能更加简单。

如果有什么好的建议, 或者有什么问题, 欢迎提出。

原文地址:https://www.cnblogs.com/albizzia/p/10821176.html