【python】-- RabbitMQ PublishSubscribe(消息发布订阅)

RabbitMQ RabbitMQ PublishSubscribe(消息发布订阅)

1对1的消息发送和接收,即消息只能发送到指定的queue里,但这样使用有些局限性,有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:

  1. fanout:所有bind到此exchange的queue都可以接收消息
  2. direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
  3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
  4. headers:通过headers 来决定把消息发给哪些queue(这个很少用)

一、fanout订阅发布模式(广播模式)

这种模式是所有绑定exchange的queue都可以接收到消息(纯广播的,所有消费者都能收到消息)

1、生产者(fanout_produce):

因为生产者是以广播的形式,所以这边不需要声明queue

import pika
#创建socket实例,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#声明exchange的名字和类型
channel.exchange_declare(exchange="practice",
                         exchange_type="fanout")

#广播一个消息
channel.basic_publish(
    exchange="practice",
    routing_key='',
    body="hello word"
)

print("send hello word")

connect.close()

2、消费者(fan_consumers):

消费者这边要声明一个唯一的queue_name的对象,并且从对象中获取queue名(每个唯一的queue就代表这一个收音机,锁定着FM频道(exchange))

import pika
#创建socket链接,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#声明exchange名字和类型
channel.exchange_declare(exchange="practice", exchange_type="fanout")
#rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除,result是queue的对象实例
result = channel.queue_declare(exclusive=True)  # 参数 exclusive=True 独家唯一的
queue_name = result.method.queue
#绑定exchange, 相当于打开收音机,锁定了一个FM频道
channel.queue_bind(exchange="practice",
                   queue=queue_name)


#回调函数
def callback(ch,method,properties,body):
    print("{0}".format(body))
#消费信息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
#开始消费
channel.start_consuming()

 注:

生产者发送广播是实时的,消费者需要提前等待生产者发生消息,这个又叫订阅发布,收音机模式,就像只有收音机打开了才能听到锁定的FM频道,但是如果在节目开始一段时间,再打开收音机的话,之前的节目就收听不到了

二、direct订阅发布模式(广播模式)

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

1、生产者(direct_prodecer)

import pika
import sys
#创建socket实例,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#声明exchange的名字和类型
channel.exchange_declare(exchange="direct_practice",
                         exchange_type="direct")
# 日志级别,通过三元运算符,获取外部传递的参数
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#广播一个消息
channel.basic_publish(
    exchange="direct_practice",
    routing_key=severity,
    body=message
)

print("send %s  %s" % (severity, message))

connect.close()

2、消费者(direct_consumers)

import pika
import sys
#创建socket链接,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#声明exchange名字和类型
channel.exchange_declare(exchange="direct_practice", exchange_type="direct")
#rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除,result是queue的对象实例
result = channel.queue_declare(exclusive=True)  # 参数 exclusive=True 独家唯一的
queue_name = result.method.queue

#获取外部手动输入的安全级别
severities = sys.argv[1:]
#如果没有外部没有输入参数就直接退出
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)

#循环遍历绑定消息队列
for severity in severities:
    #绑定exchange, 相当于打开收音机,锁定了一个FM频道
    channel.queue_bind(exchange="direct_practice",
                       queue=queue_name,
                       routing_key=severity)

print("Waiting for msg")


#回调函数
def callback(ch,method,properties,body):
    print("%s  %s" % (method.routing_key, body))
#消费信息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
    #开始消费
channel.start_consuming()

 3、输出

服务端:

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 direct_producer.py info
send info  hello world

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 direct_producer.py warning
send warning  hello world

客户端:

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 direct_consumer.py info
Waiting for msg
info  b'hello world'

  

topic定义发布模式(广播模式)

direct把info、error、warning绑定级别把消息区分了,如果想做的更细致的区分,如在Linux上有一个系统日志,所有程序都在这个日志里面打日志。那如果我想知道什么是mysql的发出来的日志,什么是apache发出来的日志。然后mysql日志里面同时是info、warning、error。所以需要做更细的区分,更细致的消息过滤

1、生产者(topic_producer)

import pika
import sys
#创建socket实例,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#声明exchange的名字和类型
channel.exchange_declare(exchange="topic_practice",
                         exchange_type="topic")
# 关键字,通过三元运算符,获取外部传递的参数
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#广播一个消息
channel.basic_publish(
    exchange="topic_practice",
    routing_key=keys,
    body=message
)

print("send %s  %s" % (keys, message))

connect.close()

  

2、消费者(topic_consumers)

import pika
import sys
#创建socket实例,声明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#声明exchange的名字和类型
channel.exchange_declare(exchange="topic_practice",
                         exchange_type="topic")
# 关键字,通过三元运算符,获取外部传递的参数
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#广播一个消息
channel.basic_publish(
    exchange="topic_practice",
    routing_key=keys,
    body=message
)

print("send %s  %s" % (keys, message))

connect.close()

3、输出

服务端:

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 topic_producer.py mysql.info
send mysql.info  hello world

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 topic_producer.py adapter.error
send adapter.error  hello world

客户端:

C:UsersdellPycharmProjectsuntitledpractice
abbitmq_r>python3 topic_consumers.py mysql.* *.error
Waiting for msg
mysql.info  b'hello world'
adapter.error  b'hello world'

4、匹配规则

原文地址:https://www.cnblogs.com/Keep-Ambition/p/8052817.html