Python DataAgent

一. DataAgentManager

1. InitSocket——nanomsg进程间通信(deviceHeartBeatSocket、deviceHeartBeatEid、sendHeartBeatSocket、sendHeartBeatEid、deviceCmdSocket、deviceCmdEid)

2. GetSysConfig()

3. GatewaySn——调试模式:直接取配置文件中的Sn;发布模式:匹配取到的网关序列号是否与配置文件中的相同,不相同则程序报错重启

4. AgentBusInit()——①获取基站定位。②初始化IotHub,获取BolbConfig,通过BlobConfig从Blob中获取gatewayConfig和deviceConfig,注册ReceiveIoTHubMessageCallback(接收云端命令),注册DeviceTwinCallback(获取BlobConfig,若Blob中GatewayConfig或DeviceConfig改变,则调用该callback重新获取Config)。③初始化数据缓存,创建对应目录。④注册事件,包括deviceHeartBeat、sendHeartBeat、网关应用与IotHub状态同步、计算CPU内存消耗、命令返回给IotHub。

5. AgentBusStart()——启动所有注册事件。

6. 事件一:HeartBeat。①UpdateConfig更新配置到其他进程。②其他进程向DataAgent同步状态。

7. 事件二:SyncStatus。同步网关应用程序状态和基站定位信息到IotHub。

8. 事件三:CpuMemoryUsage。记录进程消耗CPU和内存使用情况。

9. 事件四:DeviceCmdResponse。将DeviceManager执行的设备命令结果返回给IotHub。

二. DeviceManager

1. InitSocket——nanomsg进程间通信(deviceHeartBeatSocket, deviceHeartBeatEid, deviceCommandSocket, deviceCommandEid, dataSendSocket, dataSendEid, dataSendOnceSocket, dataSendOnceEid)

2. DeviceController——①发送更新配置请求给DataAgent并接收新的配置(包括网关、设备和config.ini中的配置等)。②更新配置,初始化数据缓存目录。③初始化modbus客户端,并根据点表配置计算总的寄存器数量。④注册采集一次数据事件。⑤注册采集数据事件。⑥注册执行deviceCommand事件。⑦注册计算CPU和内存使用情况事件。

3. 采集一次事件——实现开机采集一次所有点表数据。(读取、format加上rateLevel等、format成JSON、发送到SendManager)

4. 采集事件——实现按采集频率采集对应点表。(读取、format加上rateLevel等、format成JSON、发送到SendManager、失败则保存到缓存目录)

三. SendManager

1. InitSocket——nanomsg进程间通信(sendHeartBeatSocket, sendHeartBeatEid, dataReceiveSocket, dataReceiveEid, dataReceiveOnceSocket, dataReceiveOnceEid)

2. Sender——①发送更新配置请求给DataAgent并接收新的配置(包括网关、设备和config.ini中的配置)。②更新配置,初始化缓存目录。③初始化接收数据的dataPool,根据采集频率将tag点分组,根据tag点的数据类型和是否监控等分配不同的读取方式(重点)④初始化TagsReader。⑤初始化EventHub。⑥注册接收数据事件。⑦注册接收一次数据事件。⑧注册发送数据事件。⑨注册计算CPU和内存使用情况事件。

3. 接收一次事件——实现开机接收一次DeviceManager发来的一次数据并发送到EventHub。(接收、format成JSON、得到tag点的读取方式、根据读取方式得到tag点的值、构造发送数据的头部、发送到EventHub、发送失败保存到Cache、)

4. 接收数据事件——实现接收来自DeviceManager发来的数据并保存到dataPool或Cache中。(接收、format成JSON、放到dataPool、放入失败则放到Cache缓存)

5. 发送数据事件——实现将DataPool和Cache中的数据发送到EventHub。(从dataPool中取出数据、根据tag点的读取方式组合tag点的值、发送到EventHub、发送失败保存到Cache、发送Cache中的数据、发送失败继续保存到Cache中等待下次发送)

原文地址:https://www.cnblogs.com/embeddedking/p/9704187.html