mqtt协议系统设计参考

作者:极寒
链接:https://zhuanlan.zhihu.com/p/28525517
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

回顾自己的工作经历最遗憾的是没有用代码实现设计好的系统就匆匆离职了!写这篇文章主要目的是分享一下实现通信服务的思路,方便大家设计自己的通信服务,也希望通过分享实践知道设计中的不足。工作的公司是做电动汽车充电的可以说是一个很伟大的物联网项目,一个EVCS系统(Electric vehicle charging system)包括APP、云平台、充电桩、电动汽车等部分。在云平台众多的服务中通信服务是一个负责接入嵌入式网关和与后端业务服务相协调的中间件。今天主要根据自己的经历分享一下通信服务的实现细节,其中包括具体实践的也有针对系统缺陷做的一些思考。本文内容不局限于电动汽车充电系统只是以电动汽车充电系统为例,也可以作为基于mqtt协议系统的设计参考。

术语说明

嵌入式网关:它一般由嵌入式微处理器、外围硬件设备、嵌入 式操作系统以及用户的应用程序等四个部分组成。在本系统中负责继电器的开关以及与服务器的网络通信。

充电设备(充电桩):给电动汽车充电的设备通过充电枪与车连接,里面包含了一个嵌入式网关。

comm:一个需要我们实现的broker扩展程序,communication 的简称。

通信服务:负责和嵌入式网关通信的软件服务,由broker和comm组成。

需求分析

开发物联网通信服务面临的第一个需求:通信服务和嵌入式网关之间的通信协议

通信对物联网来说十分常用且关键,无论是近距离无线传输技术还是移动通信技术,都影响着物联网的发展。而在通信中,通信协议尤其重要,是指双方实体完成通信或服务所必须遵循的规则和约定,物联网中常用的通信协议:MQTT、 DDS、 AMQP、XMPP、 JMS、 REST、 CoAP这几种协议都已被广泛应用,并且每种协议都有至少10种以上的代码实现,都宣称支持实时的发布/订阅的物联网协议,但是在具体物联网系统架构设计时,需考虑实际场景的通信需求,选择合适的协议。

第二个需求:面对成千上万的socket链接高可用是摆在面前的问题

充电行业前景广阔会有多少的充电桩才能满足市场需要是一个难以预测的问题,在设计之初我们就应该保证通信服务是可以水平扩展的,从政府角度考虑充电工程又是一个民生工程不能天天出问题啊!一个高可用的通信服务是电动汽车充电等物联网系统的基础。

第三个需求:数据加密传输

数据安全放到最后讨论,这部分与实现系统的方式有关。

第四个需求:实时控制,实时监控

电动汽车充电系统是一个实时交互的系统,就像用户浏览网页长时间的等待是无法忍受的。除了业务处理时间外,传输时间应该尽量的短。通过实际的测试一条报文从嵌入式网关发送到通信服务器的时间大约在200ms左右。(使用的是3G路由器)

第五个需求:通信服务升级不要长时间大规模的影响用户

通信服务升级或者宕机都会影响系统的使用,如何快速的发现并将服务恢复才是系统设计应该关注的重点。假设上述的需求我们都已经解决了还有什么没有想到的呢?这时候测试人员站出来了发话了,这么多的设备连着通信服务假设通信服务升级有问题影响到的设备可是不少啊!(因为当时发布任务是测试人员的,每次发布都在半夜并且回退的次数也不少,用一个词来形容发布就是如履薄冰)。正是由于这个痛点我们是不是应该实现灰度发布呢?就是服务升级后让少量的设备连到升级的服务器上,等确认没有问题后再全面升级。

第六个需求:不要让雪崩现象发生

雪崩现象是由于一个服务挂掉或者宕机导致调用方异常最终导致整个系统进入不可用状态。在通信服务的详细设计中我会重点说下通信服务是如何预防雪崩现象的发生。

针对上面的需求我做了下面的通信服务设计,broker和comm部署在同一台服务器上并且是一对一的关系。

 

在上图中需要通信程序员开发的工作主要包括监听器和comm程序两部分,业务服务是充电系统的核心里面实现了如何计量、计费等逻辑,业务服务很是复杂但从通信服务的角度看主要用来处理嵌入式网关上传的数据和下发控制指令。当然不同的业务系统处理的业务不同,但是可以保证的是通信服务为他们提供了基础数据。

废话不多说!下面分享一下为什么这样设计和通信服务是如何实现的。经过比较我最终选择了mqtt协议作为物联网系统的通信协议,系统的架构设计也是协议确定后才设计的。选择了mqtt就面临选择哪个broker的问题,实现mqtt协议的broker如下图所示并且我在群里做了一次简单的使用量统计。

 

mosquitto只提供了一种桥接的方式而且不建议打开持久化,因为IO会降低broker的性能。如果关闭日志这就导致出了问题没地可查,虽然使用量最多但总感觉不够强大。emqttd确实支持了分布式部署的需求而且是国人实现的产品中文开发文档也是非常丰富,却无法满足灰度发布需要的灵活度,由于目前开源的broker都无法全部满足实际需求,于是就有了上面的系统架构。

通信服务实现中用到的通信方式。

one-way:发送者将数据发送出去后不需要等待接收方返回ack;
request-response:发送者和接受者以同步的方式调用;
two-way:发送者将数据发送出去后接收方会在规定的时间内返回一个ack但是整个过程都是异步的。

详细设计:

嵌入式网关在连接通信服务时(包括重连)首先以同步的方式向监听器请求获取URL地址然后再去和具体的broker通信,连接到broker后嵌入式网关不再和监听器通信。<1.获取IP地址>的过程我们可以采用http rest方式,这里可以借鉴httpDNS 的思路 <2.数据传输>的过程采用的是mqtt协议。监听器通过设备ID、协议version和设备重要程度动态分配通信服务器IP给设备端,设备通过分配的地址和broker建立长链接。下面列举一下设备向监听器发送的请求、响应格式:

 

备用服务器地址用于在嵌入式网关端缓存一份通信服务器的地址列表如果监听器服务挂掉(connect监听器超时),设备需要在备用服务器列表中选择一个服务器进行通信。

监听器需要哪些功能呢!

功能1:

对于主流版本的设备可以采用轮询或者加权轮询的方式,主流的设备数量比较多不易采用设备ID路由表的方式。

功能2:

对于灰度发布的设备可以在监听器中配置一张内存路由表,如果设备ID在路由表中就返回对应的通信服务器地址。

功能3:

对于多版本和多协议的需求我们也可以配置一张protocol和version的路由表,将特定版本的设备路由到指定的通信服务器上。为什么要加一张这样的路由表,主要原因是设备定制化需求也是比较多的,这些非标准化的需求如果兼容到标准的产品中可能会导致耦合度越来越高最后很难维护。采用这种路由的方式可以将标准产品和非标准项目区分开。

功能4:

监听器要有向broker直接发布消息的功能,这个设计主要是防止comm程序挂掉后连接着broker的嵌入式网关还在和broker通信导致数据到达不了comm丢失。监听器在检测到comm挂掉后要向broker发送一条嵌入式网关网络重连的消息(不是重启)方便监听器重新给这些设备分配可用的通信服务。这就要求所有的嵌入式网关有一个共同的订阅topic方便broker以广播的方式通知嵌入式网关网络重连,当然也可以是重启、时间同步、通知嵌入式程序升级、数据召唤等。

comm需要哪些功能呢!

功能1:

comm是broker的一个扩展他们共同组成了通信服务,comm需要有订阅和发布的功能用于接收和发送数据。一个嵌入式网关最少需要2个主题:“data/设备ID”用于发送数据给broker,"order/设备ID"用于接收broker发送的指令。

功能2:

上图中我们可以看到监听器和comm之间是有心跳的,如果comm挂掉监听器需要设置该地址的服务为不可用状态直到comm恢复才可以分配broker的地址给嵌入式网关使用。comm在启动的时候需要向监听器注册自己的地址信息,注册成功后监听器以主动请求comm的方式作为心跳,这样可以减少comm的实现复杂度。 心跳的内容可以是连接到本服务的socket数量或者是服务的压力指数,监听器获取这些信息可以实现更好的路由。

功能3:

定时器功能,嵌入式网关的数据需要通信服务定时召唤当然也有网关突发上传的事件消息。comm定时器的设计可以参考如下:

功能4:使用缓存和消息队列。

comm使用缓存的场景是缓存嵌入式网关最后一次上传的数据方便手机端查询实时数据。当然缓存里也可以存放网关当前的网络状态(在线状态,离网状态),MAC,GPS地理位置等。

要显示嵌入式网关的网络状态,需要嵌入式网关连接到broker时发布一条上线消息表示可以接受数据处于上线状态,当网关主动close 链接时也要发送一条离线消息,如果异常断开mqtt协议提供了遗愿让broker代替嵌入式网关发送离线状态消息。这里需要说明一下,broker发送遗愿的时间是1.5个心跳的周期所以设备每次重连的时间间隔最好大于2个心跳周期,这样可以保证设备上线后broker不会再发送遗愿消息,这样网关的网络状态才能是"上线->离线->上线 ->离线",如果重连时间少于1.5个心跳周期就可能出现 "上线->上线->离线" 导致实际网络状态与平台状态不一致。

消息队列是为了与后端服务解耦。除了缓存嵌入式网关上传的数据,还可以用于后端业务服务下发指令。此时你有没有产生疑问,后端业务服务下发指令到kafka而对应的消费者是多个comm,怎样知道设备连到在哪个broker上需要哪个comm来接收指令呢?前面已经说了设备上线的时候需要发送一条上线消息给broker此时comm程序可以把嵌入式网关ID和一个固定的Topic注册到redis缓存中(每个comm程序都有一个固定唯一用于接收指令的kafka Topic),后端业务服务在发送指令时需要先向redis缓存查询设备ID对应的kafka Topic然后发送到kafka,这样订阅该topic的comm程序就可以接收到消息,并通过broker发送给嵌入式网关。

功能5:comm是如何做到防止雪崩现象的!

comm程序的实现依赖broker、redis缓存和kafka消息队列。一个健壮的comm程序应该保证redis服务不可用的时候只会影响到实时数据的更新不会影响到通过kakfa上传的数据,同理kafka服务不可用也不会影响缓存的更新。防止雪崩最简单的思路就是线程池隔离,每个依赖的服务使用一个发送或者接受线程池。被调用方不可用时相应的comm的线程池被阻塞但不会影响到其他线程池正常的工作。具体实现我们可能用Hystrix 。Hystrix 这个神器在这里就不在细说如果希望通信服务在不可用状态恢复正常少不了他。

监听器的压力分析

根据个人经验socket链接在8000+ 时每分钟断开链接重连的次数大约在50次左右,如果其中一个broker 宕机并发请求数量可能增加到几千。因为嵌入式网关有应对监听器宕机的策略和监听器在初始化时就将策略加载到内存,程序运行中很少进行数据库IO所以监听器使用单机基本也可以满足需求,如果不放心可以使用主从方案。

mqtt协议如何才能保证设备与云平台状态一致

mqtt协议中提到的qos(quality of service) 只是数据到达broker的服务质量不能保证消费者(comm程序)不丢失数据。为了保证数据安全到达后端业务服务需要在设计业务流程时要添加业务层的ACK机制。这个确认由后端业务服务确认如果在规定的时间内没有收到ACK消息嵌入式网关需要重发。

上图中的消息都是以异步的方式发送而且一条消息到达对方中间需要经过kafka,comm,broker如果其中一方出问题都可能导致数据丢失。一条指令下发给嵌入式网关需要2次确认才能保证双方状态的一致性,

ACK1表示网关已经收到指令并且已经执行完成(成功或者失败),后端业务服务收到ACK1后根据结果做相应的业务处理。

ACK2是后端业务服务返回给嵌入式网关的确认,表示我已经收到你的返回结果咱俩的状态一致了,ACK1和ACK2里面应该包含业务所需要的数据。只有确认消息还不能满足功能实现的要求,后端业务服务在下发指令时应该包含一个指令生命周期T,后端业务服务和嵌入式网关的本次处理必须要在这个生命周期内完成,如果超过T时间都以超时处理。

1.如果指令丢失对嵌入式网关不会有影响,后端业务系统做超时处理。

2.如果ACK1丢失后端业务服务不会发送ACK2嵌入式设备如果在一个生命周期T后如果收不到ACK2嵌入式网关要进行业务回滚(结束充电)。

3.同理,如果ACK2丢失嵌入式设备也要做业务回滚。此时会导致平台与设备的状态不一致(后端业务服务已经收到ACK1认为执行完成)所以在回滚后要上传一次业务结束明细结果。因为这个返回结果是保证平台和设备状态一致的重要报文要有业务ACK机制。如果嵌入式网关没有业务回滚不需要立即发送业务明细等业务结束后发送。

上面的报文流程只是一个保证状态一致的例子,大家可以根据自己的业务添加相应的报文消息。

数据加密和身份认证等安全措施mqtt协议已经为我们考虑,详细内容可以参考mqtt协议使用手册。

设计回顾

1.监听器和comm都是对broker的一个扩展。监听器可以让socket的路由规则更加灵活而且解决了一些broker不支持分布式部署的问题;comm程序用于管理和使用broker的消息同时把设备ID和相应的topic注册到缓存方便后端业务服务下发指令。

2.每个comm接收指令的kafka topic是唯一的,这样才能保证只有一个comm来接收后端业务发送过来的指令,如果后端业务在缓存中查询不到设备ID对应的topic表示设备在离线状态不能发送指令给嵌入式网关。

3.kafka 消息队列非常灵活,后端业务服务如果使用相同的客户端ID接收消息则只有一个消费者可以请求到其他消费者不会得到重复的消息;如果后端业务采用不同的客户端ID,kafka则是广播的方式每个消费者都会收到相同的消息。

最后感谢在通信服务上给予我一路指导的师傅。

 

原文地址:https://www.cnblogs.com/coolYuan/p/12161135.html