0
点赞
收藏
分享

微信扫一扫

EMQX+HStreamDB 实现物联网流数据高效持久化

mafa1993 2023-03-17 阅读 31

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT

在 IoT 场景中,通常面临设备数量庞大、数据产生速率高、累积数据量巨大等挑战。因此,如何接入、存储和处理这些海量设备数据就成为了一个关键的问题。

EMQX 作为一款强大的​​物联网 MQTT 消息服务器​​,单个集群可处理上亿设备连接,同时提供了丰富的数据集成功能。​​HStreamDB​​ 作为一款分布式流数据库,不仅可以高效存储来自 EMQX 的海量设备数据,而且提供实时处理分析能力。EMQX 与 HStreamDB 都具备高可扩展性和可靠性,两者结合不仅能够满足大规模 IoT 应用的性能和稳定性需求,同时能够提升应用的实时性。

EMQX+HStreamDB 实现物联网流数据高效持久化_IoT_02

近期 ​​EMQX Enterprise 4.4.15​​ 发布,更新了对 HStreamDB 最新版本的支持,本文将具体介绍如何通过 EMQX 规则引擎将数据持久化到 HStreamDB,实现 MQTT 数据流的存储与实时处理。

:本文介绍的集成步骤基于 EMQX 4.4.15 和 HStreamDB 0.14.0 以上版本。

连接到 HStreamDB 集群

在下面的教程中,我们假设有一个正在运行的 EMQX Enterprise 集群和正在运行的 HStreamDB 集群。如需部署 EMQX Enterprise 集群,请参考 ​​EMQX Enterprise docs​​。如需部署 HStreamDB 集群,请参考 ​​HStreamDB docs​​,其中包含关于如何用 Docker 快速部署的说明。

我们可以通过 Docker 来部署 HStreamDB 客户端并连接到 HStreamDB 集群:

# 获取帮助信息
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help

我们在此使用 ​​hstream stream​​ 命令创建一个 stream,供接下来的示例使用:

# 使用 hstream stream 命令创建 streams
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))

接下来,连接到 HStreamDB 集群,启动交互式 HStream SQL shell:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# 如果要使用安全连接,还需要填写 --tls-ca, --tls-key, --tls-cert 参数

如果连接成功,将会出现

__  _________________  _________    __  ___
/ / / / ___/_ __/ __ \/ ____/ | / |/ /
/ /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
/ __ /___/ // / / _, _/ /___/ ___ |/ / / /
/_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/

Command
:h To show these help info
:q To exit command line interface
:help [sql_operation] To show full usage of sql statement

SQL STATEMENTS:
To create a simplest stream:
CREATE STREAM stream_name;
To create a query select all fields from a stream:
SELECT * FROM stream_name EMIT CHANGES;
To insert values to a stream:
INSERT INTO stream_name (field1, field2) VALUES (1, 2);

可以使用 ​​show streams;​​ 来查看已经创建的 streams 的信息:

> show streams;
+-------------------------------------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------------------------------------+---------+----------------+-------------+
| basic_condition_info_0 | 3 | 604800 seconds | 1 |
+-------------------------------------------+---------+----------------+-------------+

创建 HStreamDB 资源

在利用 EMQX 规则引擎将数据持久化到 HStreamDB 之前,需要创建一个 HStreamDB 资源。

为此,请访问 EMQX Dashboard,单击 ​​规则引擎​​ -> ​​资源​​ → ​​创建​​ ,选择 ​​HStreamDB 资源​​,输入 HStreamDB 地址并填写必要的选项。可用选项如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化_物联网_03

在选择开启 SSL 时,会出现额外的 SSL 配置界面,可以粘贴所需配置内容或上传文件。

EMQX+HStreamDB 实现物联网流数据高效持久化_规则引擎_04

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_05

创建数据持久化到 HStreamDB 的规则

点击 ​​规则引擎​​ -> ​​规则​​ -> ​​创建​​。

EMQX+HStreamDB 实现物联网流数据高效持久化_SQL_06

编辑 SQL 规则并添加操作,您可以在字符串模板中使用 SQL 变量。

请注意,本文档中介绍的 SQL 规则仅供演示,实际的 SQL 应根据业务设计进行编写。

单击 ​​添加操作​​,选择「数据持久化」以将数据保存到 HStreamDB 中。选择上一步创建的资源并输入参数。可用参数如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化_SQL_07

EMQX+HStreamDB 实现物联网流数据高效持久化_IoT_08

点击 ​​确定​​ 来确认添加行为。

EMQX+HStreamDB 实现物联网流数据高效持久化_物联网_09

在 HStream SQL Shell 中获取实时的数据更新

从 EMQX 规则引擎持久化到 HStreamDB 的数据可以使用 HStream SQL Shell 实时读出新写入 stream 的内容。现在,数据已经被写入 HStreamDB,可以使用任何消费方式来消费消息。文档使用了一个简单的消费方法:使用 HStream SQL shell 进行查询。此外,读者可以自由选择使用自己​​喜欢的编程语言 SDK​​ 编写消费端。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;

当前的 ​​select​​ 查询没有结果可供打印出,这是因为还没有数据通过 EMQX 的规则引擎向 HStreamDB 写入。一旦有数据写入,便可以在 HStream SQL shell 观察到数据的即时更新。目前在 HStreamDB 使用 SQL 对 streams 做查询,只会打印出创建查询后的结果。如果在 EMQX 停止向 HStreamDB 写入后创建查询,可能观察不到产生的结果。

向 EMQX 写入消息测试规则引擎

可以使用跨平台的桌面客户端 ​​MQTT X​​ 来连接到 EMQX 并发送消息:

EMQX+HStreamDB 实现物联网流数据高效持久化_物联网_10

从 EMQX Dashboard 获取规则引擎的运行数据指标

访问对应的规则引擎界面:

EMQX+HStreamDB 实现物联网流数据高效持久化_MQTT_11

如果规则引擎运行数据指标正常,则代表 EMQX 会将数据持久化到 HStreamDB。一旦写入成功,便可以在前面步骤启动的 HStream SQL Shell 中看到实时的数据更新。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
{"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0}
{"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2}
{"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9}
{"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9}

结语

至此,我们就完成了通过 EMQX 规则引擎将数据持久化到 HStreamDB 的主要流程。

将 EMQX 采集到的数据存储到 HStreamDB 后,可以对这些数据进行实时处理与分析,为上层 AI、大数据等应用提供支撑,进一步发掘和利用数据价值。作为首个专为流数据设计的云原生流数据库,HStreamDB 与 EMQX 结合可以实现一站式存储和实时处理海量物联网数据,精简物联网应用数据栈,加速企业的物联网应用开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:​​https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb​​

举报

相关推荐

0 条评论