mqtt client python example

This is a simple example showing how to use the [Paho MQTT Python client](https://eclipse.org/paho/clients/python/) to send data to Azure IoT Hub. You need to assemble the rights credentials and configure TLS and the MQTT protocol version appropriately.

 send_iot-hub_paho_mqtt.py

#!/usr/bin/python

import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
import ssl

auth = {
  'username':"ciscohackhub.azure-devices.net/lora1",
  'password':"SharedAccessSignature sr=ciscohackhub.azure-devices.net%2Fdevices%2Flora1&sig=xxxx&se=1463048772"
}

tls = {
  'ca_certs':"/etc/ssl/certs/ca-certificates.crt",
  'tls_version':ssl.PROTOCOL_TLSv1
}

publish.single("devices/lora1/messages/events/",
  payload="hello world",
  hostname="ciscohackhub.azure-devices.net",
  client_id="lora1",
  auth=auth,
  tls=tls,
  port=8883,
  protocol=mqtt.MQTTv311)

 

 The following code will subscribe on topic f and republish on topic f2

import paho.mqtt.client as mqtt
message = 'ON'
def on_connect(mosq, obj, rc):
    mqttc.subscribe("f", 0)
    print("rc: " + str(rc))

def on_message(mosq, obj, msg):
    global message
    print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
    message = msg.payload
    mqttc.publish("f2",msg.payload);

def on_publish(mosq, obj, mid):
    print("mid: " + str(mid))

def on_subscribe(mosq, obj, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))

def on_log(mosq, obj, level, string):
    print(string)

mqttc = mqtt.Client()
# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Connect
mqttc.connect("localhost", 1883,60)


# Continue the network loop
mqttc.loop_forever()

  用戶端程式偶爾需要發佈訊息,不須與 mqtt broker 保持連線的情形,可用 single() 或 multiple() 方法。這種作法比較省電

import paho.mqtt.publish as publish

# publish a message then disconnect.
host = "localhost"
topic = "tw/rocksaying"
payload = "hello mqtt"

# If broker asks user/password.
auth = {'username': "", 'password': ""}

# If broker asks client ID.
client_id = ""

publish.single(topic, payload, qos=1, hostname=host)

#publish.single(topic, payload, qos=1, host=host,
#    auth=auth, client_id=client_id)

  當用戶端程式,例如感應器服務程式,經常或短週期地持續發佈訊息時,則應用連線式設計。

# coding: utf-8
import sys, os, time
reload(sys)
sys.setdefaultencoding('utf-8')

import paho.mqtt.client as mqtt

# If broker asks client ID.
client_id = ""

client = mqtt.Client(client_id=client_id)

# If broker asks user/password.
user = ""
password = ""
client.username_pw_set(user, password)

client.connect("localhost")

topic = "tw/rocksaying"
payload = "你好 mqtt"

for i in xrange(10):
    client.publish(topic, "%s - %d" % (payload, i))
    time.sleep(0.01)
    # 當 qos = 0, 若訊息間隔太短,就可能會漏發訊息。這是正常現象。

  實作時,可先用 mosquitto_sub 訂閱主題,以監看訊息是否送出。

訂閱主題

本節實作一個類似 mosquitto_sub 的程式,訂閱主題 “tw/rocksaying/#” 。它也是一個服務程式的基礎骨架。

# coding: utf-8
import sys, os, time, signal
reload(sys)
sys.setdefaultencoding('utf-8')
import paho.mqtt.client as mqtt

client = None
mqtt_looping = False

TOPIC_ROOT = "tw/rocksaying"

def on_connect(mq, userdata, rc, _):
    # subscribe when connected.
    mq.subscribe(TOPIC_ROOT + '/#')

def on_message(mq, userdata, msg):
    print "topic: %s" % msg.topic
    print "payload: %s" % msg.payload
    print "qos: %d" % msg.qos

def mqtt_client_thread():
    global client, mqtt_looping
    client_id = "" # If broker asks client ID.
    client = mqtt.Client(client_id=client_id)

    # If broker asks user/password.
    user = ""
    password = ""
    client.username_pw_set(user, password)

    client.on_connect = on_connect
    client.on_message = on_message

    try:
        client.connect("localhost")
    except:
        print "MQTT Broker is not online. Connect later."

    mqtt_looping = True
    print "Looping..."

    #mqtt_loop.loop_forever()
    cnt = 0
    while mqtt_looping:
        client.loop()

        cnt += 1
        if cnt > 20:
            try:
                client.reconnect() # to avoid 'Broken pipe' error.
            except:
                time.sleep(1)
            cnt = 0

    print "quit mqtt thread"
    client.disconnect()

def stop_all(*args):
    global mqtt_looping
    mqtt_looping = False

if __name__ == '__main__':
    signal.signal(signal.SIGTERM, stop_all)
    signal.signal(signal.SIGQUIT, stop_all)
    signal.signal(signal.SIGINT,  stop_all)  # Ctrl-C

    mqtt_client_thread()

    print "exit program"
    sys.exit(0)

  

用戶端服務

大部份 MQTT 用戶端服務程式需要同時監看與發佈訊息。例如一個感應器服務程式,它一邊得監看主題以接收來自其他程式的動作請求;另一邊得讀取感應器狀態後發佈到主題上。

Paho 提供的範例程式使用 loop_start() 方法進入內建的待命執行緒,再讓設計者於主執行緒中讀取感應器狀態與發佈訊息。如下所示:

mqttc.loop_start()  # enter a looping thread.

# main thread
while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)

  

原文地址:https://www.cnblogs.com/saryli/p/6925958.html