RabbitMQ发布订阅

RabbitMq 发布与订阅
rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:
Exchange 常用的三种工作模式:fanout, direct, topicd
 
模式一:fanout
这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。
  • 不需要指定 routing_key ,即使指定了也是无效。
  • 需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
  • 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。
发布者:
import pika
import json
# 生成证书
credentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明exchange,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
    channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
connection.close()
订阅者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

channel.basic_consume(result.method.queue,callback, auto_ack = False)# 设置成 False,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                 
channel.start_consuming()
模式二:direct
这种工作模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。
  •  可以使用默认 exchange =' ' ,也可以自定义 exchange
  • 这种模式下不需要将 exchange 和 队列进行绑定,当然绑定也是可以的。可以将 exchange 和 queue ,routing_key 和 queue 进行绑定
  • 传递或接受消息时 需要 指定 routing_key
  • 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。
发布者:
import pika
import json
credentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明exchange,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')

for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
                          properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
connection.close()
订阅者:
import pika
credentials = pika.PlainCredentials('shampoo', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

#channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接受消息
channel.basic_consume(result.method.queue,callback, auto_ack = False)# 设置成 False,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉 
channel.start_consuming()
模式三:topicd
这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。
不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如‘’#‘’是匹配全部,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,就不贴出来了。
 
 

原文地址:https://www.cnblogs.com/absoluteli/p/13986056.html