09-emq-broker.md 6.6 KB
Newer Older
B
Bo Ding 已提交
1
---
sangshuduo's avatar
sangshuduo 已提交
2 3
sidebar_label: EMQX Broker
title: EMQX Broker 写入
B
Bo Ding 已提交
4 5
---

sangshuduo's avatar
sangshuduo 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/emqx)是一开源的 MQTT Broker 软件,无需任何代码,只需要在 EMQX Dashboard 里使用“规则”做简单配置,即可将 MQTT 的数据直接写入 TDengine。EMQX 支持通过 发送到 Web 服务的方式保存数据到 TDengine,也在企业版上提供原生的 TDengine 驱动实现直接保存。

## 前置条件

要让 EMQX 能正常添加 TDengine 数据源,需要以下几方面的准备工作。
- TDengine 集群已经部署并正常运行
- taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)
- 如果使用后文介绍的模拟写入程序,需要安装合适版本的 Node.js,推荐安装 v12。

## 安装并启动 EMQX

用户可以根据当前的操作系统,到 EMQX 官网下载安装包,并执行安装。下载地址如下:<https://www.emqx.io/zh/downloads>。安装后使用 `sudo emqx start``sudo systemctl start emqx` 启动 EMQX 服务。

## 在 TDengine 中为接收 MQTT 数据创建相应数据库和表结构

### 以 Docker 安装 TDengine 为例

```bash
    docker exec -it tdengine bash
    taos
```

### 创建数据库和表

```sql
    create database test;
    use test;
    create table:

    CREATE TABLE sensor_data (ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp);
```

注:表结构以博客[数据传输、存储、展现,EMQ X + TDengine 搭建 MQTT 物联网数据可视化平台](https://www.taosdata.com/blog/2020/08/04/1722.html)为例。后续操作均以此博客场景为例进行,请你根据实际应用场景进行修改。

## 配置 EMQX 规则

由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.3 为例,其他版本请参考相应官网文档。

### 登录 EMQX Dashboard

使用浏览器打开网址 http://IP:18083 并登录 EMQX Dashboard。初次安装用户名为 `admin` 密码为:`public`

![img](./emqx/login-dashboard.png)

### 创建规则(Rule)

选择左侧“规则引擎(Rule Engine)”中的“规则(Rule)”并点击“创建(Create)”按钮:

![img](./emqx/rule-engine.png)

### 编辑 SQL 字段

![img](./emqx/create-rule.png)

### 新增“动作(action handler)”

![img](./emqx/add-action-handler.png)

### 新增“资源(Resource)”

![img](./emqx/create-resource.png)

选择“发送数据到 Web 服务“并点击“新建资源”按钮:

### 编辑“资源(Resource)”

选择“发送数据到 Web 服务“并填写 请求 URL 为 运行 taosAdapter 的服务器地址和端口(默认为 6041)。其他属性请保持默认值。

![img](./emqx/edit-resource.png)

### 编辑“动作(action)”

编辑资源配置,增加 Authorization 认证的键/值配对项,相关文档请参考[ TDengine REST API 文档](https://docs.taosdata.com/reference/rest-api/)。在消息体中输入规则引擎替换模板。

![img](./emqx/edit-action.png)

## 编写模拟测试程序

```javascript
    // mock.js
    const mqtt = require('mqtt')
    const Mock = require('mockjs')
    const EMQX_SERVER = 'mqtt://localhost:1883'
    const CLIENT_NUM = 10
    const STEP = 5000 // 模拟采集时间间隔 ms
    const AWAIT = 5000 // 每次发送完后休眠时间,防止消息速率过快 ms
    const CLIENT_POOL = []
    startMock()
    function sleep(timer = 100) {
      return new Promise(resolve => {
        setTimeout(resolve, timer)
      })
    }
    async function startMock() {
      const now = Date.now()
      for (let i = 0; i < CLIENT_NUM; i++) {
        const client = await createClient(`mock_client_${i}`)
        CLIENT_POOL.push(client)
      }
      // last 24h every 5s
      const last = 24 * 3600 * 1000
      for (let ts = now - last; ts <= now; ts += STEP) {
        for (const client of CLIENT_POOL) {
          const mockData = generateMockData()
          const data = {
            ...mockData,
            id: client.clientId,
            area: 0,
            ts,
          }
          client.publish('sensor/data', JSON.stringify(data))
        }
        const dateStr = new Date(ts).toLocaleTimeString()
        console.log(`${dateStr} send success.`)
        await sleep(AWAIT)
      }
      console.log(`Done, use ${(Date.now() - now) / 1000}s`)
    }
    /**
     * Init a virtual mqtt client
     * @param {string} clientId ClientID
     */
    function createClient(clientId) {
      return new Promise((resolve, reject) => {
        const client = mqtt.connect(EMQX_SERVER, {
          clientId,
        })
        client.on('connect', () => {
          console.log(`client ${clientId} connected`)
          resolve(client)
        })
        client.on('reconnect', () => {
          console.log('reconnect')
        })
        client.on('error', (e) => {
          console.error(e)
          reject(e)
        })
      })
    }
    /**
    * Generate mock data
    */
    function generateMockData() {
     return {
       "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
       "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
       "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
       "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
       "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
       "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "area": Mock.Random.integer(0, 20),
       "ts": 1596157444170,
     }
    }
```

注意:代码中 CLIENT_NUM 在开始测试中可以先设置一个较小的值,避免硬件性能不能完全处理较大并发客户端数量。

![img](./emqx/client-num.png)

## 执行测试模拟发送 MQTT 数据

```
npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js
```

![img](./emqx/run-mock.png)

## 验证 EMQX 接收到数据

在 EMQX Dashboard 规则引擎界面进行刷新,可以看到有多少条记录被正确接收到:

![img](./emqx/check-rule-matched.png)

## 验证数据写入到 TDengine

使用 TDengine CLI 程序登录并查询相应数据库和表,验证数据是否被正确写入到 TDengine 中:

![img](./emqx/check-result-in-taos.png)

TDengine 详细使用方法请参考 [TDengine 官方文档](https://docs.taosdata.com/)
EMQX 详细使用方法请参考 [EMQ 官方文档](https://www.emqx.io/docs/zh/v4.4/rule/rule-engine.html)