提交 011ca825 编写于 作者: S Shengliang Guan

TD-1310 Refactor the mqtt module codes

上级 60416be4
......@@ -42,6 +42,16 @@ IF (${MEM_CHECK} MATCHES "true")
MESSAGE(STATUS "build with memory check")
ENDIF ()
IF (${MQTT} MATCHES "false")
SET(TD_MQTT FALSE)
MESSAGE(STATUS "build without mqtt module")
ENDIF ()
IF (${SYNC} MATCHES "false")
SET(TD_SYNC FALSE)
MESSAGE(STATUS "build without sync module")
ENDIF ()
IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
......
......@@ -142,7 +142,7 @@ char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883";
char tsMqttUser[TSDB_MQTT_USER_LEN] = {0};
char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0};
char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber";
char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/weather/loop";
char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // #
// monitor
int32_t tsEnableMonitorModule = 1;
......@@ -774,7 +774,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN;
......@@ -784,7 +784,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "mqttPort";
cfg.ptr = tsMqttPort;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_MQTT_PORT_LEN;
......@@ -794,7 +794,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "mqttTopic";
cfg.ptr = tsMqttTopic;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_MQTT_TOPIC_LEN;
......
......@@ -611,7 +611,7 @@ static bool dnodeReadMnodeInfos() {
}
for (int i = 0; i < size; ++i) {
cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
......@@ -627,7 +627,7 @@ static bool dnodeReadMnodeInfos() {
goto PARSE_OVER;
}
strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
}
ret = true;
......
......@@ -19,11 +19,11 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
......
......@@ -23,11 +23,12 @@ extern "C" {
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mqtt.h"
#include "taos.h"
#define QOS 1
#define TIMEOUT 10000L
#define MQTT_SEND_BUF_SIZE 102400
#define MQTT_RECV_BUF_SIZE 102400
/**
* @brief A structure that I will use to keep track of some data needed
......@@ -36,12 +37,12 @@ extern "C" {
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref mqttReconnectClient is called, this instance will be passed.
*/
struct reconnect_state_t {
typedef struct SMqttReconnectState {
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
} SMqttReconnectState;
/**
* @brief My reconnect callback. It will reestablish the connection whenever
......@@ -52,7 +53,7 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published);
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published);
/**
* @brief The client's refresher. This function triggers back-end routines to
......@@ -67,12 +68,7 @@ void* mqttClientRefresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code);
#define QOS 1
#define TIMEOUT 10000L
void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon);
#ifdef __cplusplus
}
......
......@@ -15,11 +15,13 @@
#ifndef TDENGINE_MQTT_PLYLOAD_H
#define TDENGINE_MQTT_PLYLOAD_H
#ifdef __cplusplus
extern "C" {
#endif
char split(char str[], char delims[], char** p_p_cmd_part, int max);
char* converJsonToSql(char* json, char* _dbname, char* _tablename);
char* mqttConverJsonToSql(char* json, int maxSize);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_SYSTEM_H
#define TDENGINE_MQTT_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
......@@ -14,52 +14,142 @@
*/
#define _DEFAULT_SOURCE
#include "mqttPayload.h"
#include "os.h"
#include "cJSON.h"
#include "string.h"
#include "taos.h"
#include "mqttLog.h"
#include "os.h"
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
#include "mqttPayload.h"
// subscribe message like this
/*
/test {
"timestamp": 1599121290,
"gateway": {
"name": "AcuLink 810 Gateway",
"model": "AcuLink810-868",
"serial": "S8P20200207"
},
"device": {
"name": "Acuvim L V3 .221",
"model": "Acuvim-L-V3",
"serial": "221",
"online": true,
"readings": [
{
"param": "Freq_Hz",
"value": "59.977539",
"unit": "Hz"
},
{
"param": "Va_V",
"value": "122.002907",
"unit": "V"
},
{
"param": "DI4",
"value": "5.000000",
"unit": ""
}
]
}
}
*/
/*
* This is an example, this function needs to be implemented in order to parse the json file into a sql statement
* Note that you need to create a super table and database before writing data
* In this case:
* create database mqttdb;
* create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16));
*/
char* mqttConverJsonToSql(char* json, int maxSize) {
// const int32_t maxSize = 10240;
char* sql = malloc(maxSize);
cJSON* root = cJSON_Parse(json);
if (root == NULL) {
mqttError("failed to parse msg, invalid json format");
goto MQTT_PARSE_OVER;
}
cJSON* timestamp = cJSON_GetObjectItem(root, "timestamp");
if (!timestamp || timestamp->type != cJSON_Number) {
mqttError("failed to parse msg, timestamp not found");
goto MQTT_PARSE_OVER;
}
cJSON* device = cJSON_GetObjectItem(root, "device");
if (!device) {
mqttError("failed to parse msg, device not found");
goto MQTT_PARSE_OVER;
}
char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
cJSON* jPlayload = cJSON_Parse(json);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++)
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttInfo("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
cJSON* name = cJSON_GetObjectItem(device, "name");
if (!name || name->type != cJSON_String) {
mqttError("failed to parse msg, name not found");
goto MQTT_PARSE_OVER;
}
cJSON* model = cJSON_GetObjectItem(device, "model");
if (!model || model->type != cJSON_String) {
mqttError("failed to parse msg, model not found");
goto MQTT_PARSE_OVER;
}
cJSON* serial = cJSON_GetObjectItem(device, "serial");
if (!serial || serial->type != cJSON_String) {
mqttError("failed to parse msg, serial not found");
goto MQTT_PARSE_OVER;
}
cJSON* readings = cJSON_GetObjectItem(device, "readings");
if (!readings || readings->type != cJSON_Array) {
mqttError("failed to parse msg, readings not found");
goto MQTT_PARSE_OVER;
}
int count = cJSON_GetArraySize(readings);
if (count <= 0) {
mqttError("failed to parse msg, readings size smaller than 0");
goto MQTT_PARSE_OVER;
}
int len = snprintf(sql, maxSize, "insert into");
for (int i = 0; i < count; ++i) {
cJSON* reading = cJSON_GetArrayItem(readings, i);
if (reading == NULL) continue;
cJSON* param = cJSON_GetObjectItem(reading, "param");
if (!param || param->type != cJSON_String) {
mqttError("failed to parse msg, param not found");
goto MQTT_PARSE_OVER;
}
cJSON* value = cJSON_GetObjectItem(reading, "value");
if (!value || value->type != cJSON_String) {
mqttError("failed to parse msg, value not found");
goto MQTT_PARSE_OVER;
}
cJSON* unit = cJSON_GetObjectItem(reading, "unit");
if (!unit || unit->type != cJSON_String) {
mqttError("failed to parse msg, unit not found");
goto MQTT_PARSE_OVER;
}
len += snprintf(sql, maxSize - len,
" mqttdb.%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)",
serial->valuestring, name->valuestring, model->valuestring, serial->valuestring, param->valuestring,
unit->valuestring, timestamp->valueint * 1000, value->valuestring);
}
cJSON_free(jPlayload);
int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024;
char* _sql = calloc(1, sqllen);
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
return _sql;
cJSON_free(root);
return sql;
MQTT_PARSE_OVER:
cJSON_free(root);
free(sql);
return NULL;
}
\ No newline at end of file
......@@ -14,35 +14,21 @@
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h"
#include "mqtt.h"
#include "mqttInit.h"
#include "mqttLog.h"
#include "mqttPayload.h"
#include "os.h"
#include "tmqtt.h"
#include "posix_sockets.h"
#include "string.h"
#include "taos.h"
#include "tglobal.h"
#include "tmqtt.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "mqttSystem.h"
#define MQTT_SEND_BUF_SIZE 102400
#define MQTT_RECV_BUF_SIZE 102400
struct mqtt_client tsMqttClient = {0};
struct reconnect_state_t tsMqttStatus = {0};
static pthread_t tsMqttClientDaemonThread = {0};
static void* tsMqttConnect = NULL;
static bool mqttIsRuning = false;
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published);
void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon);
void* mqttClientRefresher(void* client);
struct mqtt_client tsMqttClient = {0};
struct SMqttReconnectState tsMqttStatus = {0};
static pthread_t tsMqttClientDaemonThread = {0};
static void* tsMqttConnect = NULL;
static bool tsMqttIsRuning = false;
int32_t mqttInitSystem() { return 0; }
......@@ -51,22 +37,22 @@ int32_t mqttStartSystem() {
tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE;
tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE);
tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE);
mqttIsRuning = true;
tsMqttIsRuning = true;
mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback);
if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) {
mqttError("mqtt client failed to start daemon.");
mqttError("mqtt failed to start daemon.");
mqttCleanupRes(EXIT_FAILURE, -1, NULL);
return -1;
}
mqttInfo("mqtt client listening for %s messages", tsMqttTopic);
mqttInfo("mqtt listening for topic:%s messages", tsMqttTopic);
return 0;
}
void mqttStopSystem() {
if (mqttIsRuning) {
mqttIsRuning = false;
if (tsMqttIsRuning) {
tsMqttIsRuning = false;
tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR;
taosMsleep(300);
......@@ -82,52 +68,41 @@ void mqttCleanUpSystem() {
}
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) {
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*)malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
mqttInfo("received publish('%s'): %s", topic_name, (const char*)published->application_message);
char _token[128] = {0};
char _dbname[128] = {0};
char _tablename[128] = {0};
const char* content = published->application_message;
mqttDebug("receive message size:%d", (int)published->application_message_size);
if (tsMqttConnect == NULL) {
mqttInfo("connect database");
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &tsMqttClient, &tsMqttConnect);
}
if (topic_name[1] == '/' && strncmp((char*)&topic_name[1], tsMqttTopic, strlen(tsMqttTopic)) == 0) {
char* p_p_cmd_part[5] = {0};
char copystr[1024] = {0};
strncpy(copystr, topic_name, MIN(1024, published->topic_name_size));
char part_index = split(copystr, "/", p_p_cmd_part, 10);
if (part_index < 4) {
mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'",
topic_name);
tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0);
if (tsMqttConnect == NULL) {
mqttError("failed to connect tdengine");
return;
} else {
strncpy(_token, p_p_cmd_part[1], 127);
strncpy(_dbname, p_p_cmd_part[2], 127);
strncpy(_tablename, p_p_cmd_part[3], 127);
mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename);
if (tsMqttConnect != NULL) {
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
mqttInfo("query:%s", _sql);
taos_query_a(tsMqttConnect, _sql, mqttQueryInsertCallback, &tsMqttClient);
mqttInfo("free sql:%s", _sql);
free(_sql);
}
mqttInfo("successed to connect tdengine");
}
}
mqttTrace("receive message content:%s", content);
char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size);
if (sql != NULL) {
void* res = taos_query(tsMqttConnect, sql);
int code = taos_errno(res);
if (code != 0) {
mqttError("failed to exec sql%s", sql);
}
taos_free_result(res);
} else {
mqttDebug("failed to parse mqtt message");
}
free(topic_name);
}
void* mqttClientRefresher(void* client) {
while (mqttIsRuning) {
while (tsMqttIsRuning) {
mqtt_sync((struct mqtt_client*)client);
taosMsleep(100);
}
mqttDebug("mqtt client quit refresher");
mqttDebug("mqtt quit refresher");
return NULL;
}
......@@ -142,28 +117,8 @@ void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) {
}
}
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
taos_close(tsMqttConnect);
tsMqttConnect = NULL;
return;
}
mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
}
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code));
} else if (code == 0) {
mqttError("mqtt:%d, save data failed, affect rows:%d", code, code);
} else {
mqttInfo("mqtt:%d, save data success, code:%s", code, tstrerror(code));
}
}
void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqttInfo("mqtt client tries to connect to the server");
mqttInfo("mqtt tries to connect to the mqtt server");
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
......@@ -173,17 +128,14 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqttError("mqtt client was in error state %s", mqtt_error_str(client->error));
}
int sockfd = open_nb_socket("test.mosquitto.org", "1883");
int sockfd = open_nb_socket(tsMqttHostName, tsMqttPort);
if (sockfd < 0) {
mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort);
mqttCleanupRes(EXIT_FAILURE, sockfd, NULL);
//mqttCleanupRes(EXIT_FAILURE, sockfd, NULL);
return;
}
// mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
// mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
// mqtt_subscribe(client, tsMqttTopic, 0);
mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, "datetime", 0);
mqtt_connect(client, "tsMqttClientId", NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, tsMqttTopic, 0);
}
\ No newline at end of file
......@@ -6,6 +6,14 @@ system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c mqttBrokerAddress -v mqtt://test.mosquitto.org:1883/#
system sh/cfg.sh -n dnode1 -c mqttBrokerClientId -v taosmqtt
\ No newline at end of file
system sh/cfg.sh -n dnode1 -c mqtt -v 1
system sh/exec.sh -n dnode1 -s start
sql sleep 3000
sql connect
sql create database mqttdb;
sql create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16));
sql sleep 1000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册