You need to sign in or sign up before continuing.
09-emq-broker.md 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
---
sidebar_label: EMQX Broker
title: EMQX Broker writing
---

MQTT is a popular IoT data transfer protocol, [EMQX](https://github.com/emqx/emqx) is an open-source MQTT Broker software, without any code, only need to use "rules" in EMQX Dashboard to do simple configuration. You can write MQTT data directly to TDengine. EMQX supports saving data to TDengine by sending it to web services and provides a native TDengine driver for direct saving in the Enterprise Edition. Please refer to the [EMQX official documentation](https://www.emqx.io/docs/en/v4.4/rule/rule-engine.html) for details on how to use it. tdengine).

## Prerequisites

The following preparations are required for EMQX to add TDengine data sources correctly.
- The TDengine cluster is deployed and working properly
- taosAdapter is installed and running properly. Please refer to the [taosAdapter manual](/reference/taosadapter) for details.
- If you use the emulated writers described later, you need to install the appropriate version of Node.js. V12 is recommended.

## Install and start EMQX

Depending on the current operating system, users can download the installation package from the [EMQX official website](https://www.emqx.io/downloads) and execute the installation. After installation, use `sudo emqx start` or `sudo systemctl start emqx` to start the EMQX service.

## Create the appropriate database and table schema in TDengine for receiving MQTT data

### Take the Docker installation of TDengine as an example

```bash
    docker exec -it tdengine bash
    taos
```

### Create Database and Table

```sql
    CREATE DATABASE test;
    USE test;

    CREATE TABLE sensor_data (ts timestamp, temperature float, humidity float, volume float, PM10 float, pm25 float, SO2 float, NO2 float, CO float, sensor_id NCHAR(255), area TINYINT, coll_time timestamp);
```

Note: The table schema is based on the blog [(In Chinese) Data Transfer, Storage, Presentation, EMQX + TDengine Build MQTT IoT Data Visualization Platform](https://www.taosdata.com/blog/2020/08/04/1722.html) as an example. Subsequent operations are carried out with this blog scenario too. Please modify it according to your actual application scenario.

## Configuring EMQX Rules

Since the configuration interface of EMQX differs from version to version, here is v4.4.3 as an example. For other versions, please refer to the corresponding official documentation.

### Login EMQX Dashboard

Use your browser to open the URL `http://IP:18083` and log in to EMQX Dashboard. The initial installation username is `admin` and the password is: `public`.

D
dingbo 已提交
47
![img](./emqx/login-dashboard.webp)
48 49 50 51 52

### Creating Rule

Select "Rule" in the "Rule Engine" on the left and click the "Create" button: !

D
dingbo 已提交
53
![img](./emqx/rule-engine.webp)
54 55 56

### Edit SQL fields

D
dingbo 已提交
57
![img](./emqx/create-rule.webp)
58 59 60

### Add "action handler"

D
dingbo 已提交
61
![img](./emqx/add-action-handler.webp)
62 63 64

### Add "Resource"

D
dingbo 已提交
65
![img](./emqx/create-resource.webp)
66 67 68 69 70 71 72

Select "Data to Web Service" and click the "New Resource" button.

### Edit "Resource"

Select "Data to Web Service" and fill in the request URL as the address and port of the server running taosAdapter (default is 6041). Leave the other properties at their default values.

D
dingbo 已提交
73
![img](./emqx/edit-resource.webp)
74 75 76 77 78

### Edit "action"

Edit the resource configuration to add the key/value pairing for Authorization. Please refer to the [ TDengine REST API documentation ](https://docs.taosdata.com/reference/rest-api/) for the authorization in details. Enter the rule engine replacement template in the message body.

D
dingbo 已提交
79
![img](./emqx/edit-action.webp)
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165

## Compose program to mock data

```javascript
    // mock.js
    const mqtt = require('mqtt')
    const Mock = require('mockjs')
    const EMQX_SERVER = 'mqtt://localhost:1883'
    const CLIENT_NUM = 10
    const STEP = 5000 // Data interval in ms
    const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
    const CLIENT_POOL = []
    startMock()
    function sleep(timer = 100) {
      return new Promise(resolve => {
        setTimeout(resolve, timer)
      })
    }
    async function startMock() {
      const now = Date.now()
      for (let i = 0; i < CLIENT_NUM; i++) {
        const client = await createClient(`mock_client_${i}`)
        CLIENT_POOL.push(client)
      }
      // last 24h every 5s
      const last = 24 * 3600 * 1000
      for (let ts = now - last; ts <= now; ts += STEP) {
        for (const client of CLIENT_POOL) {
          const mockData = generateMockData()
          const data = {
            ...mockData,
            id: client.clientId,
            area: 0,
            ts,
          }
          client.publish('sensor/data', JSON.stringify(data))
        }
        const dateStr = new Date(ts).toLocaleTimeString()
        console.log(`${dateStr} send success.`)
        await sleep(AWAIT)
      }
      console.log(`Done, use ${(Date.now() - now) / 1000}s`)
    }
    /**
     * Init a virtual mqtt client
     * @param {string} clientId ClientID
     */
    function createClient(clientId) {
      return new Promise((resolve, reject) => {
        const client = mqtt.connect(EMQX_SERVER, {
          clientId,
        })
        client.on('connect', () => {
          console.log(`client ${clientId} connected`)
          resolve(client)
        })
        client.on('reconnect', () => {
          console.log('reconnect')
        })
        client.on('error', (e) => {
          console.error(e)
          reject(e)
        })
      })
    }
    /**
    * Generate mock data
    */
    function generateMockData() {
     return {
       "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
       "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
       "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
       "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
       "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
       "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
       "area": Mock.Random.integer(0, 20),
       "ts": 1596157444170,
     }
    }
```

Note: `CLIENT_NUM` in the code can be set to a smaller value at the beginning of the test to avoid hardware performance be not capable to handle a more significant number of concurrent clients.

D
dingbo 已提交
166
![img](./emqx/client-num.webp)
167 168 169 170 171 172 173 174

## Execute tests to simulate sending MQTT data

```
npm install mqtt mockjs --save ---registry=https://registry.npm.taobao.org
node mock.js
```

D
dingbo 已提交
175
![img](./emqx/run-mock.webp)
176 177 178 179 180

## Verify that EMQX is receiving data

Refresh the EMQX Dashboard rules engine interface to see how many records were received correctly:

D
dingbo 已提交
181
![img](./emqx/check-rule-matched.webp)
182 183 184 185 186

## Verify that data writing to TDengine

Use the TDengine CLI program to log in and query the appropriate databases and tables to verify that the data is being written to TDengine correctly:

D
dingbo 已提交
187
![img](./emqx/check-result-in-taos.webp)
188 189 190

Please refer to the [TDengine official documentation](https://docs.taosdata.com/) for more details on how to use TDengine.
EMQX Please refer to the [EMQX official documentation](https://www.emqx.io/docs/en/v4.4/rule/rule-engine.html) for details on how to use EMQX.