提交 10a94132 编写于 作者: D dingbo 提交者: gccgdb1234

docs: emqx doucmentation

上级 e9ec7b13
......@@ -8,31 +8,24 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em
## 前置条件
要让 EMQX 能正常添加 TDengine 数据源,需要以下几方面的准备工作。
- TDengine 集群已经部署并正常运行
- taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)
- 如果使用后文介绍的模拟写入程序,需要安装合适版本的 Node.js,推荐安装 v12
- 如果使用后文介绍的模拟写入程序,需要安装合适版本的 Node.js,推荐安装 v12
## 安装并启动 EMQX
用户可以根据当前的操作系统,到 EMQX 官网下载安装包,并执行安装。下载地址如下:<https://www.emqx.io/zh/downloads>。安装后使用 `sudo emqx start``sudo systemctl start emqx` 启动 EMQX 服务。
## 在 TDengine 中为接收 MQTT 数据创建相应数据库和表结构
### 以 Docker 安装 TDengine 为例
```bash
docker exec -it tdengine bash
taos
```
## 创建数据库和表
### 创建数据库和表
在 TDengine 中为接收 MQTT 数据创建相应数据库和表结构。进入 TDengine CLI 复制并执行以下 SQL 语句:
```sql
create database test;
use test;
create table:
CREATE TABLE sensor_data (ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp);
CREATE DATABASE test;
USE test;
CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volume FLOAT, pm10 FLOAT, pm25 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, sensor_id NCHAR(255), area TINYINT, coll_time TIMESTAMP);
```
注:表结构以博客[数据传输、存储、展现,EMQX + TDengine 搭建 MQTT 物联网数据可视化平台](https://www.taosdata.com/blog/2020/08/04/1722.html)为例。后续操作均以此博客场景为例进行,请你根据实际应用场景进行修改。
......@@ -43,7 +36,7 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em
### 登录 EMQX Dashboard
使用浏览器打开网址 http://IP:18083 并登录 EMQX Dashboard。初次安装用户名为 `admin` 密码为:`public`
使用浏览器打开网址 http://IP:18083 并登录 EMQX Dashboard。初次安装用户名为 `admin` 密码为:`public`
![TDengine Database EMQX login dashboard](./emqx/login-dashboard.webp)
......@@ -55,6 +48,17 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em
### 编辑 SQL 字段
复制以下内容输入到 SQL 编辑框:
```sql
SELECT
payload
FROM
"sensor/data"
```
其中 `payload` 代表整个消息体, `sensor/data` 为本规则选取的消息主题。
![TDengine Database EMQX create rule](./emqx/create-rule.webp)
### 新增“动作(action handler)”
......@@ -65,101 +69,54 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em
![TDengine Database EMQX create resource](./emqx/create-resource.webp)
选择“发送数据到 Web 服务并点击“新建资源”按钮:
选择“发送数据到 Web 服务并点击“新建资源”按钮:
### 编辑“资源(Resource)”
选择“发送数据到 Web 服务“并填写 请求 URL 为 运行 taosAdapter 的服务器地址和端口(默认为 6041)。其他属性请保持默认值。
选择“WebHook”并填写“请求 URL”为 taosAdapter 提供 REST 服务的地址,如果是本地启动的 taosadapter, 那么默认地址为:
```
http://127.0.0.1:6041/rest/sql
```
其他属性请保持默认值。
![TDengine Database EMQX edit resource](./emqx/edit-resource.webp)
### 编辑“动作(action)”
编辑资源配置,增加 Authorization 认证的键/值配对项,相关文档请参考[ TDengine REST API 文档](https://docs.taosdata.com/reference/rest-api/)。在消息体中输入规则引擎替换模板。
编辑资源配置,增加 Authorization 认证的键/值配对项。默认用户名和密码对应的 Authorization 值为:
```
Basic cm9vdDp0YW9zZGF0YQ==
```
相关文档请参考[ TDengine REST API 文档](/reference/rest-api/)
在消息体中输入规则引擎替换模板:
```sql
INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)
```
![TDengine Database EMQX edit action](./emqx/edit-action.webp)
最后点击左下方的 “Create” 按钮,保存规则。
## 编写模拟测试程序
```javascript
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10
const STEP = 5000 // 模拟采集时间间隔 ms
const AWAIT = 5000 // 每次发送完后休眠时间,防止消息速率过快 ms
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {
setTimeout(resolve, timer)
})
}
async function startMock() {
const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {
const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {
for (const client of CLIENT_POOL) {
const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {
console.log('reconnect')
})
client.on('error', (e) => {
console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {
"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}
${{#include docs-examples/other/mock.js}}
```
注意:代码中 CLIENT_NUM 在开始测试中可以先设置一个较小的值,避免硬件性能不能完全处理较大并发客户端数量。
......@@ -189,4 +146,3 @@ node mock.js
TDengine 详细使用方法请参考 [TDengine 官方文档](https://docs.taosdata.com/)
EMQX 详细使用方法请参考 [EMQX 官方文档](https://www.emqx.io/docs/zh/v4.4/rule/rule-engine.html)
......@@ -16,22 +16,15 @@ The following preparations are required for EMQX to add TDengine data sources co
Depending on the current operating system, users can download the installation package from the [EMQX official website](https://www.emqx.io/downloads) and execute the installation. After installation, use `sudo emqx start` or `sudo systemctl start emqx` to start the EMQX service.
## Create the appropriate database and table schema in TDengine for receiving MQTT data
### Take the Docker installation of TDengine as an example
## Create Database and Table
```bash
docker exec -it tdengine bash
taos
```
### Create Database and Table
In this step we create the appropriate database and table schema in TDengine for receiving MQTT data. Open TDengine CLI and execute SQL bellow:
```sql
CREATE DATABASE test;
USE test;
CREATE TABLE sensor_data (ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp);
CREATE DATABASE test;
USE test;
CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volume FLOAT, pm10 FLOAT, pm25 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, sensor_id NCHAR(255), area TINYINT, coll_time TIMESTAMP);
```
Note: The table schema is based on the blog [(In Chinese) Data Transfer, Storage, Presentation, EMQX + TDengine Build MQTT IoT Data Visualization Platform](https://www.taosdata.com/blog/2020/08/04/1722.html) as an example. Subsequent operations are carried out with this blog scenario too. Please modify it according to your actual application scenario.
......@@ -54,6 +47,15 @@ Select "Rule" in the "Rule Engine" on the left and click the "Create" button: !
### Edit SQL fields
Copy SQL bellow and paste it to the SQL edit area:
```sql
SELECT
payload
FROM
"sensor/data"
```
![TDengine Database EMQX create rule](./emqx/create-rule.webp)
### Add "action handler"
......@@ -68,97 +70,45 @@ Select "Data to Web Service" and click the "New Resource" button.
### Edit "Resource"
Select "Data to Web Service" and fill in the request URL as the address and port of the server running taosAdapter (default is 6041). Leave the other properties at their default values.
Select "WebHook" and fill in the request URL as the address and port of the server running taosAdapter (default is 6041). Leave the other properties at their default values.
![TDengine Database EMQX edit resource](./emqx/edit-resource.webp)
### Edit "action"
Edit the resource configuration to add the key/value pairing for Authorization. Please refer to the [ TDengine REST API documentation ](https://docs.taosdata.com/reference/rest-api/) for the authorization in details. Enter the rule engine replacement template in the message body.
Edit the resource configuration to add the key/value pairing for Authorization. If you use the default TDengine username and password then the value of key Authorization is:
```
Basic cm9vdDp0YW9zZGF0YQ==
```
Please refer to the [ TDengine REST API documentation ](/reference/rest-api/) for the authorization in details.
Enter the rule engine replacement template in the message body:
```sql
INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)
```
![TDengine Database EMQX edit action](./emqx/edit-action.webp)
Finally, click the "Create" button at bottom left corner saving the rule.
## Compose program to mock data
```javascript
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10
const STEP = 5000 // Data interval in ms
const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {
setTimeout(resolve, timer)
})
}
async function startMock() {
const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {
const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {
for (const client of CLIENT_POOL) {
const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {
console.log('reconnect')
})
client.on('error', (e) => {
console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {
"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}
${{#include docs-examples/other/mock.js}}
```
Note: `CLIENT_NUM` in the code can be set to a smaller value at the beginning of the test to avoid hardware performance be not capable to handle a more significant number of concurrent clients.
......
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10
const STEP = 5000 // Data interval in ms
const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {
setTimeout(resolve, timer)
})
}
async function startMock() {
const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {
const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {
for (const client of CLIENT_POOL) {
const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {
console.log('reconnect')
})
client.on('error', (e) => {
console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {
"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册