Python:Rocketmq消息队列使用

rocketmq可以与kafka等一起使用,用于实时消息处理。

安装rocketmq

pip install rocketmq [-i https://pypi.tuna.tsinghua.edu.cn/simple]

生产消息producer

from rocketmq.client import Producer, Message
import json

producer = Producer('PID-test')
producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')  #rocketmq队列接口地址(服务器ip:port)
producer.start()

msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}
ss = json.dumps(msg_body).encode('utf-8')

msg = Message('topic_name')   #topic名称
msg.set_keys('xxxxxx')
msg.set_tags('xxxxxx')
msg.set_body(ss)      #message body

retmq = producer.send_sync(msg)
print(retmq.status, retmq.msg_id, retmq.offset)
producer.shutdown()

其中:

  • 设置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx') 

当只有单一服务器时,格式是上面这个;

当有多个服务器地址(集群模式)时,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")

 不过以下这种方式本人测试不通过:producer.set_namesrv_addr(["xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx"])

  • 如果使用pandas数据,pandas数据可以直接转换

some_df.to_json(orient='records').encode('utf-8'),然后放入body中发送。

消费消息consumer

可以使用 PushConsumer 和  PullConsumer,同样来自 rocketmq.client。

# 使用PullConsumer时
from rocketmq.client import PullConsumer
consumer = PullConsumer('CID_test')
consumer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
consumer.start()

for msg in consumer.pull('topic_name'):
    print(msg.id, msg.body)
consumer.shutdown()

# PushConsumer与此类似
from rocketmq.client import PushConsumer

:目前rocketmq库只支持linux和mac。

#

参考:

https://www.oschina.net/p/rocketmq-python

https://github.com/apache/rocketmq-client-python

原文地址:https://www.cnblogs.com/qi-yuan-008/p/14022378.html