RabbitMQ消息队列(三): 发布/订阅

1. 订阅/发布:

前面worker示例中的每个任务都是只发送给某一个worker,如果我们多个worker都需要接收处理同一个任务,此时就要使用

订阅/发布功能,比如,日志模块产生日志并发送到队列中,队列连接两个worker,一个负责打印到控制台,一个负责打印到日志文件,

则队列需要将内部消息同时发送到两个worker中做不同的处理。

2. 交换:

前面示例当中,我们是直接使用队列来收发消息的,那并不是RabbitMQ的完整模型,完整模型当中还包含有"交换",消息不应该直接发送给

队列,而是发送给"交换"。交换的模型很简单,其一端连接生产者,一端连接消息队列,交换需要一定的规则来对收到消息做处理,比如发给

某个队列,亦或者丢弃该消息,这个规则我们称之为"交换类型": direct, topic, headers ,fanout,本文以及后面的文章会对几种类型做详细

介绍,可以使用如下方式创建交换,如下其名称为logs,类型是=fanout,fanout类型不会关系消息,只是简单对消息广播到连接队列。

channel.exchange_declare(exchange='logs',
                         type='fanout')

含有交换的完整模型如下图所示:

              

3. 临时队列:

在不需要多个生产者或者消费者共享队列的时候,队列名称我们是不关心的,RabbitMQ提供了一种随机生成队列的方式:

result = channel.queue_declare()

result.method.queue中含有队列的名称

当我们需要设置消费者断开,队列自动销毁,可以使用如下方式,标记exlusive=True:

result = channel.queue_declare(exclusive=True)

4. 绑定:

队列和交换均建立完成,此时我们需要绑定队列和交换,这样交换才知道向哪些队列发送消息,方式如下:

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

绑定之后的模型如下:

               

完成模型,包含worker:

               

5. 测试代码:

emit_log.py -- 产生日志消息,发送到交换:

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange='logs',
10                          type='fanout')
11 
12 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13 channel.basic_publish(exchange='logs',
14                       routing_key='',
15                       body=message)
16 print(" [x] Sent %r" % message)
17 connection.close()

reveive_logs.py--临时队列绑定交换,接收日志消息并处理;

 1 #!/usr/bin/env python
 2 import pika
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='logs',
 9                          type='fanout')
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 channel.queue_bind(exchange='logs',
15                    queue=queue_name)
16 
17 print(' [*] Waiting for logs. To exit press CTRL+C')
18 
19 def callback(ch, method, properties, body):
20     print(" [x] %r" % body)
21 
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)
25 
26 channel.start_consuming()
原文地址:https://www.cnblogs.com/wanpengcoder/p/5291627.html