EMQ X:WebHook

WebHook

WebHook 是由 emqx_web_hook插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务 的功能。

WebHook 的内部实现是基于钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

    Client      |    EMQ X     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+

WebHook 对于事件的处理是单向的,它仅支持将 EMQ X 中的事件推送给 Web 服务,并不关心 Web 服务的返回。 借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

简单来讲,该机制目的在于增强软件系统的扩展性、方便与其他三方系统的集成、或者改变其系统原有 的默认行为。如下图:

image-20210728134815313

当系统中不存在 钩子 (Hooks) 机制时,整个事件处理流程 从 事件 (Event) 的输入,到 处理 (Handler), 再到完成后的返回结果 (Result) 对于系统外部而讲,都是不可见、且无法修改的。

而在这个过程中加入一个可挂载函数的点 (HookPoint),允许外部插件挂载多个回调函数,形成一个调用链。达到对内部事件处理过程的扩展和修改。系统中常用到的认证插件则是按照该逻辑进行实现的。

因此,在 EMQ X 中,钩子 (Hooks) 这种机制极大地方便了系统的扩展。我们不需要修改 emqx 核心代 码,仅需要在特定的位置埋下挂载点 (HookPoint) ,便能允许外部插件扩展 EMQ X 的各种行为。

对于实现者来说仅需要关注:

  1. 挂载点 (HookPoint) 的位置:包括其作用、执行的时机、和如何挂载和取消挂载。
  2. 回调函数 的实现:包括回调函数的入参个数、作用、数据结构等,及返回值代表的含义。
  3. 了解回调函数在 链 上执行的机制:包括回调函数执行的顺序,及如何提前终止链的执行。

配置项说明

配置文件:/etc/emqx/plugins/emqx_web_hook.conf

web.hook.url:Webhook 请求转发的目的 Web 服务器地址。

web.hook.encoding_of_payload_field:PUBLISH 报文中 Payload 字段的编码格式。

触发规则

配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

event:目前支持以下事件

名称 说明 执行时机
client.connect 处理连接报文 服务端收到客户端的连接报文时
client.connack 下发连接应答 服务端准备下发连接应答报文时
client.connected 成功接入 客户端认证完成并成功接入系统后
client.disconnected 连接断开 客户端连接层在准备关闭时
client.subscribe 订阅主题 收到订阅报文后,执行 client.check_acl 鉴权前
client.unsubscribe 取消订阅 收到取消订阅报文后
session.subscribed 会话订阅主题 完成订阅操作后
session.unsubscribed 会话取消订阅 完成取消订阅操作后
message.publish 消息发布 服务端在发布(路由)消息前
message.delivered 消息投递 消息准备投递到客户端前
message.acked 消息回执 服务端在收到客户端发回的消息 ACK 后
message.dropped 消息丢弃 发布出的消息被丢弃后

number:同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

rule:触发规则

其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 a/b/cfoo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

这样 Webhook 仅会转发与 a/b/cfoo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/dfo/bar

WebHook事件参数

事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 url 所配置的 Web 服务器上。其请求格式为:

URL: <url>      # 来自于配置中的 `url` 字段
Method: POST        # 固定为 POST 方法
Body: <JSON>        # Body 为 JSON 格式字符串

对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:

client.connect

Key 类型 说明
action string 事件名称 固定为:"client_connect"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号

client.connack

Key 类型 说明
action string 事件名称 固定为:"client_connack"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号
conn_ack string "success" 表示成功,其它表示失败的原因

client.connected

Key 类型 说明
action string 事件名称 固定为:"client_connected"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
ipaddress string 客户端源 IP 地址
keepalive integer 客户端申请的心跳保活时间
proto_ver integer 协议版本号
connected_at integer 时间戳(秒)

client.disconnected

Key 类型 说明
action string 事件名称 固定为:"client_disconnected"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
reason string 错误原因

client.subscribe

Key 类型 说明
action string 事件名称 固定为:"client_subscribe"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
topic string 将订阅的主题
opts json 订阅参数

opts 包含

Key 类型 说明
qos enum QoS 等级,可取 0 1 2

client.unsubscribe

Key 类型 说明
action string 事件名称 固定为:"client_unsubscribe"
clientid string 客户端 ClientId
username string 客户端 Username,不存在时该值为 "undefined"
topic string 取消订阅的主题

session.subscribed:同 client.subscribe,action 为 session_subscribed

session.unsubscribed:同 client.unsubscribe,action 为 session_unsubscribe

session.terminated: 同 client.disconnected,action 为 session_terminated

message.publish

Key 类型 说明
action string 事件名称 固定为:"message_publish"
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 "undefined"
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息的时间戳(毫秒)

message.delivered

Key 类型 说明
action string 事件名称 固定为:"message_delivered"
clientid string 接收端 ClientId
username string 接收端 Username,不存在时该值为 "undefined"
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 "undefined"
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息时间戳(毫秒)

message.acked

Key 类型 说明
action string 事件名称 固定为:"message_acked"
clientid string 接收端 ClientId
from_client_id string 发布端 ClientId
from_username string 发布端 Username,不存在时该值为 "undefined"
topic string 取消订阅的主题
qos enum QoS 等级,可取 0 1 2
retain bool 是否为 Retain 消息
payload string 消息 Payload
ts integer 消息时间戳(毫秒)

WebHook案例编写

修改配置文件:

web.hook.url = http://127.0.0.1:8991/mqtt/webhook

web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

启动webhook插件:

image-20210728143440735

重启emqx:

emqx restart

暴露方法:

@RestController
@RequestMapping("/mqtt")
public class WebHookController {

    private static final Logger logger = LoggerFactory.getLogger(WebHookController.class);

    private final Map<String,Boolean> clientStatusMap = new HashMap<>();

    @PostMapping("/webhook")
    public void webhook(@RequestBody Map<String, Object> params){
        logger.info("emqx 触发 webhook,请求体数据={}",params);
        String action = (String) params.get("action");
        String clientId = (String) params.get("clientid");
        if(action.equals("client_connected")){
            //客户端成功接入
            clientStatusMap.put(clientId,true);
        }
        if(action.equals("client_disconnected")){
            //客户端断开连接
            clientStatusMap.put(clientId,false);
        }
    }

    @GetMapping("/getall")
    public Map<String,Boolean> getAllStatus(){
        return clientStatusMap;
    }
}

打包上传到服务器中,并运行。

测试WebHook

使用客户端工具,连接,订阅主题和发送消息,观察控制台输出。

image-20210728150642970

image-20210728150627988

原文地址:https://www.cnblogs.com/wwjj4811/p/15070582.html