0
点赞
收藏
分享

微信扫一扫

EMQX WebHook及案例


1 WebHook

WebHook 是由 ​​emqx_web_hook (opens new window)​​插件提供的 将 EMQX 中的钩子事件通知到某个 Web 服务 的功能。

WebHook 的内部实现是基于 ​​钩子​​,但它更靠近顶层一些。

通过在钩子上的挂载回调函数,获取到 EMQX 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

Client      |    EMQX     |  emqx_web_hook |   HTTP       +------------+
=============>| - - - - - - -> - - - - - - - ->===========> | Web Server |
| Broker | | Request +------------+


提示:WebHook 对于事件的处理是单向的,它仅支持将 EMQX 中的事件推送给 Web 服务,并不关心 Web 服务的返回

借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。


2 配置项

Webhook 的配置文件位于 ​​etc/plugins/emqx_web_hook.conf​​​,配置项的详细说明可以查看 ​​配置项​​。

3 触发规则

在 ​​etc/plugins/emqx_web_hook.conf​​ 可配置触发规则,其配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

4 Event 触发事件

目前支持以下事件:

名称

说明

执行时机

client.connect

处理连接报文

服务端收到客户端的连接报文时

client.connack

下发连接应答

服务端准备下发连接应答报文时

client.connected

成功接入

客户端认证完成并成功接入系统后

client.disconnected

连接断开

客户端连接层在准备关闭时

client.subscribe

订阅主题

收到订阅报文后,执行 ​​client.check_acl​​ 鉴权前

client.unsubscribe

取消订阅

收到取消订阅报文后

session.subscribed

会话订阅主题

完成订阅操作后

session.unsubscribed

会话取消订阅

完成取消订阅操作后

message.publish

消息发布

服务端在发布(路由)消息前

message.delivered

消息投递

消息准备投递到客户端前

message.acked

消息回执

服务端在收到客户端发回的消息 ACK 后

message.dropped

消息丢弃

发布出的消息被丢弃后

5 Number

同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

6 Rule

触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 ​​a/b/c​​​ 和 ​​foo/#​​ 主题匹配的消息转发到 Web 服务器上,其配置应该为:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

这样 Webhook 仅会转发与 ​​a/b/c​​​ 和 ​​foo/#​​​ 主题匹配的消息,例如 ​​foo/bar​​​ 等,而不是转发 ​​a/b/d​​​ 或 ​​fo/bar​​。

7 Webhook 事件参数

事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 ​​url​​ 所配置的 Web 服务器上。其请求格式为:

URL: <url>      # 来自于配置中的 `url` 字段
Method: POST # 固定为 POST 方法

Body: <JSON> # Body 为 JSON 格式字符串

对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:

7.1 client.connect

Key

类型

说明

action

string

事件名称 固定为:“client_connect”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

ipaddress

string

客户端源 IP 地址

keepalive

integer

客户端申请的心跳保活时间

proto_ver

integer

协议版本号

7.2 client.connack

Key

类型

说明

action

string

事件名称 固定为:“client_connack”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

ipaddress

string

客户端源 IP 地址

keepalive

integer

客户端申请的心跳保活时间

proto_ver

integer

协议版本号

conn_ack

string

“success” 表示成功,其它表示失败的原因

7.3 client.connected

Key

类型

说明

action

string

事件名称 固定为:“client_connected”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

ipaddress

string

客户端源 IP 地址

keepalive

integer

客户端申请的心跳保活时间

proto_ver

integer

协议版本号

connected_at

integer

时间戳(秒)

7.4 client.disconnected

Key

类型

说明

action

string

事件名称 固定为:“client_disconnected”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

reason

string

错误原因

7.5 client.subscribe

Key

类型

说明

action

string

事件名称 固定为:“client_subscribe”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

topic

string

将订阅的主题

opts

json

订阅参数

7.6 opts 包含

Key

类型

说明

qos

enum

QoS 等级,可取 ​​0​​​ ​​1​​​ ​​2​

7.7 client.unsubscribe

Key

类型

说明

action

string

事件名称 固定为:“client_unsubscribe”

clientid

string

客户端 ClientId

username

string

客户端 Username,不存在时该值为 “undefined”

topic

string

取消订阅的主题

session.subscribed:同 ​​client.subscribe​​​,action 为 ​​session_subscribed​

session.unsubscribed:同 ​​client.unsubscribe​​​,action 为 ​​session_unsubscribe​

session.terminated: 同 ​​client.disconnected​​​,action 为 ​​session_terminated​

7.8 message.publish

Key

类型

说明

action

string

事件名称 固定为:“message_publish”

from_client_id

string

发布端 ClientId

from_username

string

发布端 Username,不存在时该值为 “undefined”

topic

string

取消订阅的主题

qos

enum

QoS 等级,可取 ​​0​​​ ​​1​​​ ​​2​

retain

bool

是否为 Retain 消息

payload

string

消息 Payload

ts

integer

消息的时间戳(毫秒)

7.8 message.delivered

Key

类型

说明

action

string

事件名称 固定为:“message_delivered”

clientid

string

接收端 ClientId

username

string

接收端 Username,不存在时该值为 “undefined”

from_client_id

string

发布端 ClientId

from_username

string

发布端 Username,不存在时该值为 “undefined”

topic

string

取消订阅的主题

qos

enum

QoS 等级,可取 ​​0​​​ ​​1​​​ ​​2​

retain

bool

是否为 Retain 消息

payload

string

消息 Payload

ts

integer

消息时间戳(毫秒)

7.9 message.acked

Key

类型

说明

action

string

事件名称 固定为:“message_acked”

clientid

string

接收端 ClientId

from_client_id

string

发布端 ClientId

from_username

string

发布端 Username,不存在时该值为 “undefined”

topic

string

取消订阅的主题

qos

enum

QoS 等级,可取 ​​0​​​ ​​1​​​ ​​2​

retain

bool

是否为 Retain 消息

payload

string

消息 Payload

ts

integer

消息时间戳(毫秒)

8 案例

8.1 设备上线和下线时,能够在第三方系统中查询

  • ​​EMQ-WebHook简介和使用_小哇666的博客-CSDN博客_emqx webhook​​
  1. 修改 etc/plugins/emqx_web_hook.conf 文件,设置事件转发的url和地址和触发规则
# 事件需要转发的目的服务器地址
web.hook.api.url = http://127.0.0.1:8991/mqtt/webhook

# 触发规则
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
# 以下可以不开启,测试用用
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish","topic":"img/#"}
  1. 在EMQ的控制台开启​​emqx_web_hook​​ 插件
  2. 编写springboot应用
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;

@RequestMapping("/mqtt")
@RestController
public class Controller_5 {

private final static Logger logger = LoggerFactory.getLogger(Controller_5.class);

private Map<String,Boolean> ztList=new HashMap<>();

@PostMapping("/webhook")
public void webhook(@RequestBody() Map<String,Object> params)
{
logger.info("参数列表 {}",params);
/**
* 注意 action,clientid,事件名 的名称不能修改,否则匹配不上
*/
String action = (String)params.get("action");
String clientid = (String)params.get("clientid");
if(action.equals("client_connected"))
{
logger.info("client:{} 上线",clientid);
ztList.put(clientid,true);
}
if(action.equals("client_disconnected"))
{
logger.info("client:{} 下线",clientid);
ztList.put(clientid,false);
}
}

@RequestMapping("/ztList")
public Map<String,Boolean> getZtList()
{
return ztList;
}

}

8.2 Webhook实现客户端断连监控

  • ​​EMQ高级功能使用_yemuxiaweiliang的博客-CSDN博客​​

8.2.1 断连监控需求

系统需要知道所有客户端当前的连接状态,方便在后台管理系统中进行直观展示

8.2.2 代码实现

通过EMQX 的webhook将客户端的连接断开等事件通知到我们自建的服务上,通过事件类型获取客户端的连接状态,然后将客户端的连接状态进行存储,并且提供HTTP API供后台系统查询所有客户端的状态。

package com.itheima.controller.mqtt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

/**
*
*/
@RestController
@RequestMapping("/mqtt")
public class WebHookController {
private static final Logger log = LoggerFactory.getLogger(WebHookController.class);

private Map<String,Boolean> clientStatus = new HashMap<>();

@PostMapping("/webhook")
public void hook(@RequestBody Map<String,Object> params){
log.info("emqx 触发 webhook,请求体数据={}",params);

String action = (String) params.get("action");
String clientId = (String) params.get("clientid");
if(action.equals("client_connected")){
log.info("客户端{}接入本系统",clientId);
clientStatus.put(clientId,true);
}

if(action.equals("client_disconnected")){
log.info("客户端{}下线",clientId);
clientStatus.put(clientId,false);
}

}

@GetMapping("/allStatus")
public Map getStatus(){
return this.clientStatus;
}
}

hook方法用来接收EMQ X传入过来的请求,将客户端Id的连接状态记录到map中,getAllStatus方法用来返回所有客户端状态。

然后通过客户端连接/断开EMQ X之后,通过访问 all 接口就能得到这些客户端得状态了。

当然了,在实际的项目中肯定就不会这么简单,我们会将这些客户端的状态存入类似redis这样的分布式缓存中,方便整个系统进行存取随时获取客户端状态。



举报

相关推荐

学习EMQX

emqx(docker)

emqx 安装

redmine添加webhook

emqx 集群搭建

gitlab webhook报500

Docker部署EMQX

kubebuilder实战之七:webhook

0 条评论