从零开始搭建物联网平台(6):消息的持久化

遇到的问题:

查看了EMQ文档发现并不提供消息的持久化功能,MQTT协议是按照设备一直在线设计的,数据都是保存在内存里的,但是考虑到用户上传传感器数据不可能接收了就扔掉,那样就没法查看历史数据了,所以用户上传的消息必须要能够保存下来,以便查看历史数据,这样一来持久化功能就需要我们自己来实现。

另外还会出现一个问题,当两个设备注册的主题名一样的时候,不能分出是哪一个设备发出的消息,在接收订阅消息的时候发现没办法获取到发送消息的clientID,而且其他设备也可以订阅到任意设备的消息,对于敏感信息来说存在安全性。

解决方法:

初步打算是,用户需要在后台注册自己的设备和数据流信息,后台会对所有注册的信息进行订阅接收到消息后,后台会把消息写入到对应的表中,另外设备发布主题只能使用(clientID/主题名)命名方式,以便后台能够区分是哪一个设备发送过来的消息。对于MQTT了解还是不够深,只能使用这样的笨办法来解决了,以后若是找到其他的方法在进行改进。

解决问题:

首先需要通过python建立mqtt连接监听所有注册的主题信息,这里使用了paho-mqtt库来实现,为了方便以后的调用将其封装成一个类,最开始的时候想把一些常用的操作也封装进去,单独测试完全可以,但是一旦放到Django请求中处理的时候,mqtt能够正常返回成功信息,但是实际上并没有正确执行,这一点始终没有找到原因,最终只能简化,只包含最基础的功能。

class MqClient(object):
    def __init__(self, client_id, username, password):
        self.client = client.Client(client_id=client_id,
                                    clean_session=True)  # 初始化,clean_session为false的时候EMQ会保存订阅状态,可以不再次订阅
        self.client.username_pw_set(username, password)  # 设置连接用户名
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self._client_status = False  # 连接状态
        self._cloop = None
        self._connect()  # 实例化会自动连接

    def _connect(self, host="your IP ", port=1883, keepalive=60):
        """连接服务器"""
        self.client.connect_async(host, port, keepalive)
        # 开启线程执行
        self._cloop = threading.Thread(target=self.client.loop_start())
        self._cloop.start()

    def on_connect(self, client, userdata, flags, rc):
        """连接成功的回调函数"""
        # 修改客户端状态
        if rc == 0:
            self._client_status = True

    def init_sub(self):
        # 读取数据库中所有的已经注册过的topic并且订阅
        for i in models.Device.objects.all():
            for j in i.dev_stream.all():
                self.client.subscribe(str(i.device_id) + '/' + j.name, j.qos)

    @staticmethod
    def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收订阅信息写入到数据库中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

有了封装好的类,现在我们需要做的是:在Django项目启动完成之后自动执行监听任务的,最开始的时候打算放到setting或者__init__里面,但是因为类里面封装了model操作,那时候项目还没有加载完model会报错,所以最终新建了一个app,然后放到其下的urls,这样当项目启动完成的时候就会自动加载了。

from utils.mqtt_client import MqClient

MQClient = MqClient(your client ID, username, password)
MQClient.init_sub()

接下来测试一下实时新增订阅的功能,先从urls文件导入示例化之后的对象,调用client的subscribe方法

from mqtt.urls import MQClient
class Test(APIView):
    def post(self, request):
        topic = request.POST.get('topic')
        qos = int(request.POST.get('qos'))
        if topic:
            MQClient.client.subscribe(topic,qos)
        return HttpResponse("ok")

最后就剩下把数据存入数据库中了,这个操作已经写在那个类中了。简单说明一下 ,当paho接收了mqtt请求的时候会产生一个回调,执行下面这个函数,接收到的类容包含在msg中,msg主要有topic和payload两个属性,topic是订阅的主题名,payload则是具体的消息内容,按照之前的规定,主题名为client/stream,对topic内容拆分获取到client_id和stream,最后就是数据库的插入操作了,涉及到多表操作,简单点说就是,先插入一个data数据,然后根据client_id和stream来确定stream,最后再通过add方法将两者关联起来,这样就完成了消息的保存了。

def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收订阅信息写入到数据库中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

测试一下,数据库里面已经准备一些client和stream数据,还是使用EMQ的websocket来测试,发送主题为123456/hum,消息内容为654321,在来看一下数据库中数据是否插入成功。

 

data表
stream表
stream和data关联表

 

更新:

最终对这一部分做了修改,没有将MQTT相关的东西放到Django里面,独立出来了,这样也方便日后的扩展和管理,数据库操作改用了sqlachemy实现,其他内容基本不变

 

原文地址:https://www.cnblogs.com/FanMLei/p/10500982.html