python 连接mqtt发布和订阅信息

一、下载依赖包

pip install paho-mqtt

二、创建监听订阅和发布

  • 监听程序,订阅和发布,可以根据订阅的信息进行处理
    import paho.mqtt.client as mqtt

    import json
    import time

    host = '127.0.0.1'  # mqtt服务器地址
    port = 1883
    client_id = '101'  # 客户端id,自己设置

    # 同时订阅多个主题方式使用#通配符
    # '#'号是通配符,订阅匹配#平级及子级主题的所有主题
    # '+'号是单层通配符,在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。然而它必须占据过滤器的整个层级。可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用。
    # 消息服务质量,0最多一次,1最少一次,2只一次

    def on_connect(client, userdata, flags, rc):
        '''
        订阅信息
        :param client: 链接
        :param userdata:
        :param flags:
        :param rc:
        :return:
        '''
        print('链接-----------------------------')
        print(f'Connected with result code {rc}---链接结束-------------')
        client.subscribe('data/send')
        pass


    def on_message(client, userdata, msg):
        '''
        消息主题
        :param client:
        :param userdata:
        :param msg:
        :return:
        '''
        print('信息-------------------------------')
        print(f'主题:{msg.topic} 消息:{msg.payload.decode("utf-8")}------信息结束-------')
        pass


    def on_subscribe(client, userdata, mid, granted_qos):
        '''
        消息状态
        :param client:
        :param userdata:
        :param mid:
        :param granted_qos:
        :return:
        '''
        print('状态--------------------------')
        print(client)
        print(userdata)
        print(mid)
        print(f'On Subscribed: qos = {granted_qos}---------状态结束--------')
        pass


    def on_disconnect(client, userdata, rc):
        if rc != 0:
            print(f'Unexpeced disconnection {rc}--disconnect')
            pass
        pass

    data = {
        "type": 2,
        "timestamp": time.time(),
        "messageId": "gajisrwa-fdsarakm-fdas",
        "command": "xx/recommend",
        "data": {
            "openId": "xxxx",
            "appId": 1001,
            "recommendType": "temRecommend"
        }
    }

    param = json.dumps(data)
    client = mqtt.Client(client_id)
    client.username_pw_set("admin", "public")
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    client.connect(host, port, 60)

    client.subscribe("data/send")
    client.publish("data/send", payload=param, qos=0)     # 发送消息
    # print(client.loop_read())
    # print(client.loop_write())
    # print(client.subscribe('data/send', qos=0))
    client.loop_forever()

  • 发布信息
import sys
import paho.mqtt.publish as publish
import paho.mqtt.subscribe as subscribe

# 发布信息
publish.single('data/send', '{"hello":2}', hostname='127.0.0.1')
    

原文地址:https://www.cnblogs.com/hziwei/p/14362618.html