From 011ca825f9f91d3f10b2f9b8d5abcdcb326c04ab Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Sep 2020 14:29:40 +0000 Subject: [PATCH] TD-1310 Refactor the mqtt module codes --- cmake/input.inc | 10 ++ src/common/src/tglobal.c | 8 +- src/dnode/src/dnodeMgmt.c | 4 +- src/inc/tmqtt.h | 6 +- src/plugins/mqtt/inc/mqttInit.h | 22 ++- src/plugins/mqtt/inc/mqttPayload.h | 6 +- src/plugins/mqtt/inc/mqttSystem.h | 30 ---- src/plugins/mqtt/src/mqttPayload.c | 174 +++++++++++++++++------ src/plugins/mqtt/src/mqttSystem.c | 132 ++++++----------- tests/script/general/connection/mqtt.sim | 14 +- 10 files changed, 217 insertions(+), 189 deletions(-) delete mode 100644 src/plugins/mqtt/inc/mqttSystem.h diff --git a/cmake/input.inc b/cmake/input.inc index f90b10a087..1ef2045f57 100755 --- a/cmake/input.inc +++ b/cmake/input.inc @@ -42,6 +42,16 @@ IF (${MEM_CHECK} MATCHES "true") MESSAGE(STATUS "build with memory check") ENDIF () +IF (${MQTT} MATCHES "false") + SET(TD_MQTT FALSE) + MESSAGE(STATUS "build without mqtt module") +ENDIF () + +IF (${SYNC} MATCHES "false") + SET(TD_SYNC FALSE) + MESSAGE(STATUS "build without sync module") +ENDIF () + IF (${RANDOM_FILE_FAIL} MATCHES "true") SET(TD_RANDOM_FILE_FAIL TRUE) MESSAGE(STATUS "build with random-file-fail enabled") diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 9549ccf607..b232b41296 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -142,7 +142,7 @@ char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883"; char tsMqttUser[TSDB_MQTT_USER_LEN] = {0}; char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0}; char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber"; -char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/weather/loop"; +char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // # // monitor int32_t tsEnableMonitorModule = 1; @@ -774,7 +774,7 @@ static void doInitGlobalConfig(void) { cfg.option = "mqttHostName"; cfg.ptr = tsMqttHostName; cfg.valType = TAOS_CFG_VTYPE_STRING; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN; @@ -784,7 +784,7 @@ static void doInitGlobalConfig(void) { cfg.option = "mqttPort"; cfg.ptr = tsMqttPort; cfg.valType = TAOS_CFG_VTYPE_STRING; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; cfg.ptrLength = TSDB_MQTT_PORT_LEN; @@ -794,7 +794,7 @@ static void doInitGlobalConfig(void) { cfg.option = "mqttTopic"; cfg.ptr = tsMqttTopic; cfg.valType = TAOS_CFG_VTYPE_STRING; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; cfg.ptrLength = TSDB_MQTT_TOPIC_LEN; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 1f41bc23eb..c968246a68 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -611,7 +611,7 @@ static bool dnodeReadMnodeInfos() { } for (int i = 0; i < size; ++i) { - cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); if (nodeInfo == NULL) continue; cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); @@ -627,7 +627,7 @@ static bool dnodeReadMnodeInfos() { goto PARSE_OVER; } strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); - } + } ret = true; diff --git a/src/inc/tmqtt.h b/src/inc/tmqtt.h index 401aac16c6..256e61fbae 100644 --- a/src/inc/tmqtt.h +++ b/src/inc/tmqtt.h @@ -19,11 +19,11 @@ #ifdef __cplusplus extern "C" { #endif -#include + int32_t mqttInitSystem(); int32_t mqttStartSystem(); -void mqttStopSystem(); -void mqttCleanUpSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); #ifdef __cplusplus } diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index af8c5069ad..81a9a39a2c 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -23,11 +23,12 @@ extern "C" { * @file * A simple subscriber program that performs automatic reconnections. */ -#include -#include -#include #include "mqtt.h" -#include "taos.h" + +#define QOS 1 +#define TIMEOUT 10000L +#define MQTT_SEND_BUF_SIZE 102400 +#define MQTT_RECV_BUF_SIZE 102400 /** * @brief A structure that I will use to keep track of some data needed @@ -36,12 +37,12 @@ extern "C" { * An instance of this struct will be created in my \c main(). Then, whenever * \ref mqttReconnectClient is called, this instance will be passed. */ -struct reconnect_state_t { +typedef struct SMqttReconnectState { uint8_t* sendbuf; size_t sendbufsz; uint8_t* recvbuf; size_t recvbufsz; -}; +} SMqttReconnectState; /** * @brief My reconnect callback. It will reestablish the connection whenever @@ -52,7 +53,7 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr /** * @brief The function will be called whenever a PUBLISH message is received. */ -void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published); +void mqttPublishCallback(void** unused, struct mqtt_response_publish* published); /** * @brief The client's refresher. This function triggers back-end routines to @@ -67,12 +68,7 @@ void* mqttClientRefresher(void* client); /** * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ - -void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); -#define QOS 1 -#define TIMEOUT 10000L +void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon); #ifdef __cplusplus } diff --git a/src/plugins/mqtt/inc/mqttPayload.h b/src/plugins/mqtt/inc/mqttPayload.h index b7e7abbd96..12a714afac 100644 --- a/src/plugins/mqtt/inc/mqttPayload.h +++ b/src/plugins/mqtt/inc/mqttPayload.h @@ -15,11 +15,13 @@ #ifndef TDENGINE_MQTT_PLYLOAD_H #define TDENGINE_MQTT_PLYLOAD_H + #ifdef __cplusplus extern "C" { #endif -char split(char str[], char delims[], char** p_p_cmd_part, int max); -char* converJsonToSql(char* json, char* _dbname, char* _tablename); + +char* mqttConverJsonToSql(char* json, int maxSize); + #ifdef __cplusplus } #endif diff --git a/src/plugins/mqtt/inc/mqttSystem.h b/src/plugins/mqtt/inc/mqttSystem.h deleted file mode 100644 index a79fac33b5..0000000000 --- a/src/plugins/mqtt/inc/mqttSystem.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MQTT_SYSTEM_H -#define TDENGINE_MQTT_SYSTEM_H -#ifdef __cplusplus -extern "C" { -#endif -#include -int32_t mqttInitSystem(); -int32_t mqttStartSystem(); -void mqttStopSystem(); -void mqttCleanUpSystem(); -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c index 96c8e71edd..ab3cd4c633 100644 --- a/src/plugins/mqtt/src/mqttPayload.c +++ b/src/plugins/mqtt/src/mqttPayload.c @@ -14,52 +14,142 @@ */ #define _DEFAULT_SOURCE -#include "mqttPayload.h" +#include "os.h" #include "cJSON.h" -#include "string.h" -#include "taos.h" #include "mqttLog.h" -#include "os.h" -char split(char str[], char delims[], char** p_p_cmd_part, int max) { - char* token = strtok(str, delims); - char part_index = 0; - char** tmp_part = p_p_cmd_part; - while (token) { - *tmp_part++ = token; - token = strtok(NULL, delims); - part_index++; - if (part_index >= max) break; - } - return part_index; +#include "mqttPayload.h" + +// subscribe message like this + +/* +/test { + "timestamp": 1599121290, + "gateway": { + "name": "AcuLink 810 Gateway", + "model": "AcuLink810-868", + "serial": "S8P20200207" + }, + "device": { + "name": "Acuvim L V3 .221", + "model": "Acuvim-L-V3", + "serial": "221", + "online": true, + "readings": [ + { + "param": "Freq_Hz", + "value": "59.977539", + "unit": "Hz" + }, + { + "param": "Va_V", + "value": "122.002907", + "unit": "V" + }, + { + "param": "DI4", + "value": "5.000000", + "unit": "" + } + ] + } } +*/ + +/* + * This is an example, this function needs to be implemented in order to parse the json file into a sql statement + * Note that you need to create a super table and database before writing data + * In this case: + * create database mqttdb; + * create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); + */ + +char* mqttConverJsonToSql(char* json, int maxSize) { + // const int32_t maxSize = 10240; + char* sql = malloc(maxSize); + + cJSON* root = cJSON_Parse(json); + if (root == NULL) { + mqttError("failed to parse msg, invalid json format"); + goto MQTT_PARSE_OVER; + } + + cJSON* timestamp = cJSON_GetObjectItem(root, "timestamp"); + if (!timestamp || timestamp->type != cJSON_Number) { + mqttError("failed to parse msg, timestamp not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* device = cJSON_GetObjectItem(root, "device"); + if (!device) { + mqttError("failed to parse msg, device not found"); + goto MQTT_PARSE_OVER; + } -char* converJsonToSql(char* json, char* _dbname, char* _tablename) { - cJSON* jPlayload = cJSON_Parse(json); - char _names[102400] = {0}; - char _values[102400] = {0}; - int i = 0; - int count = cJSON_GetArraySize(jPlayload); - for (; i < count; i++) - { - cJSON* item = cJSON_GetArrayItem(jPlayload, i); - if (cJSON_Object == item->type) { - mqttInfo("The item '%s' is not supported", item->string); - } else { - strcat(_names, item->string); - if (i < count - 1) { - strcat(_names, ","); - } - char* __value_json = cJSON_Print(item); - strcat(_values, __value_json); - free(__value_json); - if (i < count - 1) { - strcat(_values, ","); - } + cJSON* name = cJSON_GetObjectItem(device, "name"); + if (!name || name->type != cJSON_String) { + mqttError("failed to parse msg, name not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* model = cJSON_GetObjectItem(device, "model"); + if (!model || model->type != cJSON_String) { + mqttError("failed to parse msg, model not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* serial = cJSON_GetObjectItem(device, "serial"); + if (!serial || serial->type != cJSON_String) { + mqttError("failed to parse msg, serial not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* readings = cJSON_GetObjectItem(device, "readings"); + if (!readings || readings->type != cJSON_Array) { + mqttError("failed to parse msg, readings not found"); + goto MQTT_PARSE_OVER; + } + + int count = cJSON_GetArraySize(readings); + if (count <= 0) { + mqttError("failed to parse msg, readings size smaller than 0"); + goto MQTT_PARSE_OVER; + } + + int len = snprintf(sql, maxSize, "insert into"); + + for (int i = 0; i < count; ++i) { + cJSON* reading = cJSON_GetArrayItem(readings, i); + if (reading == NULL) continue; + + cJSON* param = cJSON_GetObjectItem(reading, "param"); + if (!param || param->type != cJSON_String) { + mqttError("failed to parse msg, param not found"); + goto MQTT_PARSE_OVER; } + + cJSON* value = cJSON_GetObjectItem(reading, "value"); + if (!value || value->type != cJSON_String) { + mqttError("failed to parse msg, value not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* unit = cJSON_GetObjectItem(reading, "unit"); + if (!unit || unit->type != cJSON_String) { + mqttError("failed to parse msg, unit not found"); + goto MQTT_PARSE_OVER; + } + + len += snprintf(sql, maxSize - len, + " mqttdb.%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", + serial->valuestring, name->valuestring, model->valuestring, serial->valuestring, param->valuestring, + unit->valuestring, timestamp->valueint * 1000, value->valuestring); } - cJSON_free(jPlayload); - int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024; - char* _sql = calloc(1, sqllen); - sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); - return _sql; + + cJSON_free(root); + return sql; + +MQTT_PARSE_OVER: + cJSON_free(root); + free(sql); + return NULL; } \ No newline at end of file diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 84dc5eea2a..0779fd6d72 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -14,35 +14,21 @@ */ #define _DEFAULT_SOURCE - -#include "cJSON.h" +#include "os.h" #include "mqtt.h" #include "mqttInit.h" #include "mqttLog.h" #include "mqttPayload.h" -#include "os.h" +#include "tmqtt.h" #include "posix_sockets.h" -#include "string.h" #include "taos.h" #include "tglobal.h" -#include "tmqtt.h" -#include "tsclient.h" -#include "tsocket.h" -#include "ttimer.h" -#include "mqttSystem.h" - -#define MQTT_SEND_BUF_SIZE 102400 -#define MQTT_RECV_BUF_SIZE 102400 -struct mqtt_client tsMqttClient = {0}; -struct reconnect_state_t tsMqttStatus = {0}; -static pthread_t tsMqttClientDaemonThread = {0}; -static void* tsMqttConnect = NULL; -static bool mqttIsRuning = false; - -void mqttPublishCallback(void** unused, struct mqtt_response_publish* published); -void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon); -void* mqttClientRefresher(void* client); +struct mqtt_client tsMqttClient = {0}; +struct SMqttReconnectState tsMqttStatus = {0}; +static pthread_t tsMqttClientDaemonThread = {0}; +static void* tsMqttConnect = NULL; +static bool tsMqttIsRuning = false; int32_t mqttInitSystem() { return 0; } @@ -51,22 +37,22 @@ int32_t mqttStartSystem() { tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE; tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE); tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE); - mqttIsRuning = true; + tsMqttIsRuning = true; mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback); if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) { - mqttError("mqtt client failed to start daemon."); + mqttError("mqtt failed to start daemon."); mqttCleanupRes(EXIT_FAILURE, -1, NULL); return -1; } - mqttInfo("mqtt client listening for %s messages", tsMqttTopic); + mqttInfo("mqtt listening for topic:%s messages", tsMqttTopic); return 0; } void mqttStopSystem() { - if (mqttIsRuning) { - mqttIsRuning = false; + if (tsMqttIsRuning) { + tsMqttIsRuning = false; tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR; taosMsleep(300); @@ -82,52 +68,41 @@ void mqttCleanUpSystem() { } void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) { - /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ - char* topic_name = (char*)malloc(published->topic_name_size + 1); - memcpy(topic_name, published->topic_name, published->topic_name_size); - topic_name[published->topic_name_size] = '\0'; - mqttInfo("received publish('%s'): %s", topic_name, (const char*)published->application_message); - char _token[128] = {0}; - char _dbname[128] = {0}; - char _tablename[128] = {0}; + const char* content = published->application_message; + mqttDebug("receive message size:%d", (int)published->application_message_size); + if (tsMqttConnect == NULL) { - mqttInfo("connect database"); - taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &tsMqttClient, &tsMqttConnect); - } - if (topic_name[1] == '/' && strncmp((char*)&topic_name[1], tsMqttTopic, strlen(tsMqttTopic)) == 0) { - char* p_p_cmd_part[5] = {0}; - char copystr[1024] = {0}; - strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); - char part_index = split(copystr, "/", p_p_cmd_part, 10); - if (part_index < 4) { - mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", - topic_name); + tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); + if (tsMqttConnect == NULL) { + mqttError("failed to connect tdengine"); + return; } else { - strncpy(_token, p_p_cmd_part[1], 127); - strncpy(_dbname, p_p_cmd_part[2], 127); - strncpy(_tablename, p_p_cmd_part[3], 127); - mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, - _tablename); - - if (tsMqttConnect != NULL) { - char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); - mqttInfo("query:%s", _sql); - taos_query_a(tsMqttConnect, _sql, mqttQueryInsertCallback, &tsMqttClient); - mqttInfo("free sql:%s", _sql); - free(_sql); - } + mqttInfo("successed to connect tdengine"); + } + } + + mqttTrace("receive message content:%s", content); + + char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size); + if (sql != NULL) { + void* res = taos_query(tsMqttConnect, sql); + int code = taos_errno(res); + if (code != 0) { + mqttError("failed to exec sql%s", sql); } + taos_free_result(res); + } else { + mqttDebug("failed to parse mqtt message"); } - free(topic_name); } void* mqttClientRefresher(void* client) { - while (mqttIsRuning) { + while (tsMqttIsRuning) { mqtt_sync((struct mqtt_client*)client); taosMsleep(100); } - mqttDebug("mqtt client quit refresher"); + mqttDebug("mqtt quit refresher"); return NULL; } @@ -142,28 +117,8 @@ void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) { } } -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); - taos_close(tsMqttConnect); - tsMqttConnect = NULL; - return; - } - mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); -} - -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code)); - } else if (code == 0) { - mqttError("mqtt:%d, save data failed, affect rows:%d", code, code); - } else { - mqttInfo("mqtt:%d, save data success, code:%s", code, tstrerror(code)); - } -} - void mqttReconnectClient(struct mqtt_client* client, void** unused) { - mqttInfo("mqtt client tries to connect to the server"); + mqttInfo("mqtt tries to connect to the mqtt server"); if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { close(client->socketfd); @@ -173,17 +128,14 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) { mqttError("mqtt client was in error state %s", mqtt_error_str(client->error)); } - int sockfd = open_nb_socket("test.mosquitto.org", "1883"); + int sockfd = open_nb_socket(tsMqttHostName, tsMqttPort); if (sockfd < 0) { mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort); - mqttCleanupRes(EXIT_FAILURE, sockfd, NULL); + //mqttCleanupRes(EXIT_FAILURE, sockfd, NULL); + return; } - // mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); - // mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); - // mqtt_subscribe(client, tsMqttTopic, 0); - mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); - mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 400); - mqtt_subscribe(client, "datetime", 0); + mqtt_connect(client, "tsMqttClientId", NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); + mqtt_subscribe(client, tsMqttTopic, 0); } \ No newline at end of file diff --git a/tests/script/general/connection/mqtt.sim b/tests/script/general/connection/mqtt.sim index 6533e414aa..f003252c5a 100644 --- a/tests/script/general/connection/mqtt.sim +++ b/tests/script/general/connection/mqtt.sim @@ -6,6 +6,14 @@ system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000 system sh/cfg.sh -n dnode1 -c http -v 1 -system sh/cfg.sh -n dnode1 -c http -v 1 -system sh/cfg.sh -n dnode1 -c mqttBrokerAddress -v mqtt://test.mosquitto.org:1883/# -system sh/cfg.sh -n dnode1 -c mqttBrokerClientId -v taosmqtt \ No newline at end of file +system sh/cfg.sh -n dnode1 -c mqtt -v 1 + +system sh/exec.sh -n dnode1 -s start + +sql sleep 3000 +sql connect +sql create database mqttdb; +sql create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); + +sql sleep 1000 +system sh/exec.sh -n dnode1 -s stop -x SIGINT -- GitLab