Python之Rabbitmq处理消息

欢迎关注【无量测试之道】公众号,回复【领取资源】,
Python编程学习资源干货、
Python+Appium框架APP的UI自动化、
Python+Selenium框架Web的UI自动化、
Python+Unittest框架API自动化、

资源和代码 免费送啦~
文章下方有公众号二维码,可直接微信扫一扫关注即可。
 

1、概念说明

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。是Rabbitmq的内部对象,用于存储消息
Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。
Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
Vhost:虚拟主机,一个Broker里可以开设多个Vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务。

2、看看Rabbitmq里面的消息长什么样子

如下截图所示:

Mesages=2 表示展示出两条数据。

3、Rabbitmq处理消息简单模式

大致五个步骤:
step1:获取Rabbitmq服务的连接
step2:创建一个信道 
step3:声明一个队列(与发消息程序的声明保持一致)
step4:定义一个回调函数,用于接收和处理队列中的消息
step5:队列与回归函数绑定
step6:开始消费消息

import pika
 
#接收消息,并写入文件,这也算是持久化了
def write_file(message):
    with open("msg.txt","a+") as f:
        f.write(message)
 
 
def consumer():#消息消费者
    # 获取与rabbitmq 服务的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672,credentials=pika.PlainCredentials('guest', 'guest')))
    # 创建一个 AMQP 信道(Channel)
    channel = connection.channel()
    # 声明消息队列tester,durable=False 表示不持久化
    channel.queue_declare(queue='tester', durable=False)
    # 定义一个回调函数来处理消息队列中的消息,这里是将消息写入文件,你也可以入库。
    def callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉生成者,消息处理完成
        write_file(body.decode())
    #告诉rabbitmq在tester列表里面收消息,收到就调用callback函数
    channel.basic_consume('tester', callback)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()
 
if __name__=="__main__":
    consumer()

Tips: callback回调函数将消息直接写入文件

如下图所示:

4、查看Rabbitmq界面消息是否处理完成

如下截图所示:

备注:我的个人公众号已正式开通,致力于测试技术的分享,包含:大数据测试、功能测试,测试开发,API接口自动化、测试运维、UI自动化测试等,微信搜索公众号:“无量测试之道”,或扫描下方二维码:

 添加关注,一起共同成长吧。

原文地址:https://www.cnblogs.com/Wu13241454771/p/13261431.html