RabbitMQ 核心概念、死信 延时队列

RabbitMQ

RabbitMQ消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

1.simple example

send.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

recive.py

import pika, sys, os

def main():
   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
   channel = connection.channel()

   channel.queue_declare(queue='hello')

   def callback(ch, method, properties, body):
       print(" [x] Received %r" % body)

   channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

   print(' [*] Waiting for messages. To exit press CTRL+C')
   channel.start_consuming()

if __name__ == '__main__':
   try:
       main()
   except KeyboardInterrupt:
       print('Interrupted')
       try:
           sys.exit(0)
       except SystemExit:
           os._exit(0)

2.核心概念

producer exchange、routing_key queue consumer

producer 将消息发往exchange

Exchange

默认使用的是direct类型,exchange为空,根据routing_key找到queue或者创建queue

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
  • direct: 将消息路由到绑定键与消息的路由键完全匹配的队列。例如,如果使用绑定键pdfprocess将队列绑定到交换机,则 使用路由键pdfprocess将**发布到交换机 的消息**路由到该队列。
  • fanout: fanout交换机将消息路由到与其绑定的所有队列。
  • topic: 主题交换在路由键和绑定中指定的路由模式之间进行通配符匹配。
  • headers :标头交换使用消息标头属性进行路由。

生产者只能将消息发送到Exchange,交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。

有几种交换类型可用:direct,topic,headers 和fanout。我们将集中讨论最后一个-fanout。让我们创建该类型的交换,并将其称为log:

channel.exchange_declare(exchange = 'logs',
                         exchange_type = 'fanout')

routing_key

发布或者订阅时指定routing_key

exchange 在direct类型时:routing_key 和exchange 消息所属队列;

exchange 在topic 模式下时:

路由键:routings = [ 'happy.work', 'happy.life' , 'happy.work.teacher', 'sad.work', 'sad.life', 'sad.work.teacher' ]

"#":匹配所有的路由键

"happy.#":匹配 'happy.work', 'happy.life' , 'happy.work.teacher'

"work.#":无匹配

“happy.*”:匹配 'happy.work', 'happy.life'

"*.work":匹配 'happy.work', 'sad.work'

"*.work.#":匹配 'happy.work', 'happy.work.teacher', 'sad.work', 'sad.work.teacher'

exchangezai fanout类型时:

到达该exchange的所有消息将会发送到绑定到该exchange的所有队列

死信队列

  1. 消息被否定确认,使用 channel.basic_nackchannel.basic_reject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

死信消息的生命周期:

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

延迟队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

参考

官网

基础

可靠消息|死信队列|延时队列

原文地址:https://www.cnblogs.com/9527mwz/p/13646113.html