从今天开始看《Redis深度历险》--延时队列

平时我们常看到的消息队列基本上就是RabbitMQ、Kafka,但是在文中也指出了,在面对一组消费者队列的时候,如果对消息队列的可靠性要求不是特别高的时候,redis可以达到要求,而且其实现相较于RabbitMQ和Kafka非常简单。

异步消息队列

  redis中的list非常适合做异步消息队列,使用lpop、rpop出队列,lpush、rpush入队列。这点非常容易理解,python中的list也是一个非常灵活的结构,和redis能够用做消息队列的原因一样。

接下来就是一些使用redis会遇到的问题。

队列空了怎么处理

  这是一个很容易就能够想到的问题和一个容易想到解决方案的问题,当任务队列中空了,无非就是两种解决方案,要么我一直等着等到队列中有任务了,要么我就先干别的,每隔多久就查询一次队列中是否不为空了。这两种方案的实现方案也很容易,第一种可以采用redis的blpop/brpop(阻塞式)来获取消息,第二种则可以使用sleep函数来等待(python种必须使用异步io库提供的sleep,不然将会是阻塞式的等待/报错)。但是关于上述这种解决办法(虽然是两种,实际是只是一种思路)还存在一个很容易遇到的问题就是关于空闲连接,什么是空闲连接呢,就是客户端请求了之后就一直阻塞在那里,时间久了服务器一般就会主动断开连接,这个时候redis就会抛出异常,所以需要注意编写异常处理代码(无论是写什么代码都需要考虑异常处理),对失败的请求判断是否需要重试。

锁冲突处理

  这个问题文中指出是属于上一篇分布式锁的遗留问题,并没有指出为什么在这一篇才提出来。。。

  解决办法有三个:

    抛出异常,稍后重试

    sleep一会之后再重试,作者特别指出这个办法缺点很多,首先会导致后续处理的任务也跟着延迟,甚至会因为线程中的个别任务导致死锁而彻底堵死整个线程。

    将请求移至延时处理队列,在我自己的处理方法中这个是最实用的。

延时队列的实现

  java版本我就不附上了。。直接从文档中复制下来的那个格式惨不忍睹,格式化的时候还会有代码报错(这就离谱),如果有需要pdf文档的,可以给我留言。

  python版本(多少解释都不如代码来的直观,作者关于延时队列的解释我将会在代码的注释中注明)

  

 1 def delay(msg):
 2     msg.id = str(uuid.uuid4())  #保证value的唯一性
 3     value = json.dumps(msg)
 4     retry_t = time.time() + 5  #5秒后重试
 5     # 使用有序队列作为我们的消息延迟队列,其score是我们重试的时间,value可以是任务内容等
 6     redis.zadd('delay-queue', retry_t, value)
 7 def loop():
 8     while True:
 9         #每次经从中取出一个任务,其他参数可以参照该指令的可选参数列表
10         values = redis.zrangebyscore('delay-queue', 0, time.time(), start=0, num=1)
11         if not values:
12             # 队列中是空的,那么就延迟1S,注意,使用异步编程时必须使用异步库中的sleep
13             time.sleep(1)
14             continue
15         # 拿第一条,也只有一条(因为我们前面取的时候num值就是1)
16         value = values[0]
17         success = redis.zrem('delay-queue', value)
18         if success:
19             msg = json.loads(value)
20             # 处理任务,其中必须有异常处理
21             handle_msg(msg)

 文中作者还指出,上述方法会在多个线程争抢一个一个任务时浪费系统性能,可以使用lua脚本将zrangebyscore和zrem合到一起变成一个原子性操作。由于本人并不会lua,网上搜索的lua脚本我就不贴上来了,有需要请自行搜索。

原文地址:https://www.cnblogs.com/slientbrain/p/13021247.html