[rabbitmq] python版本(三) 发布/订阅

之前提到:

  • 发布者producer:发布信息的应用程序
  • 队列queue:用于消息存储的缓冲
  • 消费者consumer:接收消息的应用程序

rabbitmq消息模型核心理念:发布者不会直接发送任何消息给队列。事实上,发布者甚至不知道消息是否已经被投递到队列。

发布者只需要把消息发送给一个交换机exchange。交换机一边从发布者方接受消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到制定的队列还是多个队列,或者是直接忽略消息。这些规则就是通过交换机类型exchange type来定义的

几种可选的交换机类型:

  • 直连交换机direct
  • 主题交换机topic
  • 头交换机headers
  • 扇形交换机fanout:它把消息发送给它所知道的所有队列

交换机列表
rabbitmqctl能够列出服务器上所有的交换器:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

匿名的交换器
前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。

回想我们之前是如何发布一则消息:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。

临时队列

1.连接上rabbitmq时,需要一个全新的、空的队列。可以使用的方案:

  • 手动创建一个随机的队列
  • 服务器为我们选择一个随机的队列名(推荐),调用queue_declare方法的时候,不提供queue参数就可以了
result = channel.queue_declare()

可以通过该result.method.queue获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
2.与消费者consumer断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的

result = channel.queue_declare(exclusive = True)

绑定Bindings


已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)。

channel.queue_bind(exchange='logs', queue=result.method.queue)

现在logs交换机会把消息添加到我们得队列中

板顶binding列表
可以使用rabbitmqctl list_bindings列出现存的绑定

代码整合

发布日志消息的程序看起来和之前的没有太大区别。最重要的改变就是我们把消息发送给logs交换机而不是匿名交换机。在发送的时候我们需要提供routing_key参数,但是它的值会被扇型交换机(fanout exchange)忽略。
emit_log.py

#!/usr/bin/env python
import pika
import sys

#连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明交换机名称logs和类型fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')

#命令行参数作为输入或者默认作为信息
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
#routing_key分发到制定队列
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

连接成功后,声明了一个交换器,这点很重要,因为不允许发布消息到不存在的交换器
如果没有绑定队列到交换器,消息将会丢失。但这个无所谓, 如果没有消费者监听,消息就会被忽略
receive_logs.py

#!/usr/bin/env python
import pika

#连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#交换机为logs 交换机类型为fanout-->"群发" 发送给它知道的所有队列
channel.exchange_declare(exchange='logs', exchange_type='fanout')
#服务器为我们选择一个随机的队列名;exclusive设定为True表示当与consumer断开连接时,这个队列应当被立即删除
result = channel.queue_declare(queue='', exclusive=True)
#获取已经生成的随机队列名
queue_name = result.method.queue

#交换机与队列之间绑定
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

如果保存日志到文件,控制台重定向输出到一个log文件就好

python receive_logs.py > logs_from_rabbit.log

如果是在cmd中查看,在另外一个terminal中运行

python receive_logs.py

发送日志

python emit_log.py

使用rabbitmqctl list_bindings可确认已经创建的队列绑定。可以看到运行中的两个receive_logs.py程序

$ sudo rabbitmqctl list_bindings -->windows去掉sudo
Listing bindings ...
 ...
logs    amq.gen-TJWkez28YpImbWdRKMa8sg==                []
logs    amq.gen-x0kymA4yPzAT6BoC/YP+zw==                []
...done.

显示结果很直观:logs交换器把数据发送给两个系统命名的队列。这就是我们所期望的。

日积月累,水滴石穿
原文地址:https://www.cnblogs.com/lonelyisland/p/12752289.html