diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 319772b60652d6e567e9aa70d06cbffb8167b218..a8838a25255ea5faedcf916f4fef3bd6e98f1a6a 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -94,6 +94,7 @@ extern int32_t tsMaxTables; extern char tsDefaultDB[]; extern char tsDefaultUser[]; extern char tsDefaultPass[]; +extern char tsMqttBrokerAddress[]; extern int32_t tsMaxMeterConnections; extern int32_t tsMaxVnodeConnections; extern int32_t tsMaxMgmtConnections; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 250b79febe1886cda66209082121fbc5f1cbb213..ef72e2596fa973574336682af68fba3d15ed8ac7 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -201,6 +201,8 @@ int32_t tsMonitorInterval = 30; // seconds char tsTimezone[64] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string +char tsMqttBrokerAddress[128] = {0}; + static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; @@ -729,6 +731,16 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "mqttBrokerAddress"; + cfg.ptr = tsMqttBrokerAddress; + cfg.valType = TAOS_CFG_VTYPE_STRING; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = 126; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + // socket type; udp by default cfg.option = "sockettype"; cfg.ptr = tsSocketType; diff --git a/src/inc/mqtt.h b/src/inc/mqtt.h index 710737e79a320989ad1ebf0a669246e8352a4f1d..401aac16c676f08496057e1a99be071cee1d7568 100644 --- a/src/inc/mqtt.h +++ b/src/inc/mqtt.h @@ -19,10 +19,7 @@ #ifdef __cplusplus extern "C" { #endif - #include - -int32_t mqttGetReqCount(); int32_t mqttInitSystem(); int32_t mqttStartSystem(); void mqttStopSystem(); diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt index aeb8309deaf8515ae7fd1afe3cc4556d305b2de0..3c1dfa7625832db5246e018908b2039109a33269 100644 --- a/src/plugins/mqtt/CMakeLists.txt +++ b/src/plugins/mqtt/CMakeLists.txt @@ -13,9 +13,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(mqtt ${SRC}) - TARGET_LINK_LIBRARIES( mqtt pahomqtt taos_static ) + TARGET_LINK_LIBRARIES(mqtt pahomqtt taos_static cJson) IF (TD_ADMIN) - TARGET_LINK_LIBRARIES(mqtt pahomqtt admin) + TARGET_LINK_LIBRARIES(mqtt pahomqtt admin cJson) ENDIF () ENDIF () diff --git a/src/plugins/mqtt/inc/mqttSystem.h b/src/plugins/mqtt/inc/mqttSystem.h index ba46b0809f4839603f5b40a33b6a35901499809f..923de8be3838bda7332dccc73943c4f0b3ca33f9 100644 --- a/src/plugins/mqtt/inc/mqttSystem.h +++ b/src/plugins/mqtt/inc/mqttSystem.h @@ -27,11 +27,11 @@ extern "C" { #include "tsocket.h" #include "ttimer.h" #include "tsclient.h" -int32_t mqttGetReqCount(); int32_t mqttInitSystem(); int32_t mqttStartSystem(); void mqttStopSystem(); void mqttCleanUpSystem(); +char split(char str[], char delims[], char** p_p_cmd_part, int max); void connlost(void* context, char* cause); int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message); void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code); diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 62a026d3db57f4096e716e9d3e999d30487eeb98..30c7bcc688e5e7e54e1cbaafb660058ffc901dd3 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -15,18 +15,20 @@ #define _DEFAULT_SOURCE #include "mqttSystem.h" +#include "MQTTAsync.h" +#include "cJson.h" #include "mqtt.h" #include "mqttLog.h" #include "os.h" +#include "string.h" #include "taos.h" #include "tglobal.h" +#include "tsclient.h" #include "tsocket.h" #include "ttimer.h" -#include "MQTTAsync.h" -#include "tsclient.h" -#define ADDRESS "tcp://mqtt.eclipse.org:1883" -#define CLIENTID "ExampleClientSub" -#define TOPIC "MQTT Examples" + +#define CLIENTID "taos" +#define TOPIC "/taos/+/+/+/" // taos//// #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L @@ -34,50 +36,107 @@ MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; -void* mqtt_conn=NULL; -int disc_finished = 0; -int subscribed = 0; -int finished = 0; -int can_exit = 0; +void* mqtt_conn = NULL; +int disc_finished = 0; +int subscribed = 0; +int finished = 0; +int can_exit = 0; void connlost(void* context, char* cause) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; - mqttError("\nConnection lost\n"); - if (cause) mqttError(" cause: %s\n", cause); + mqttError("\nConnection lost"); + if (cause) mqttError(" cause: %s", cause); - mqttPrint("Reconnecting\n"); + mqttPrint("Reconnecting"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start connect, return code %d\n", rc); + mqttError("Failed to start connect, return code %d", rc); finished = 1; } } +char split(char str[], char delims[], char** p_p_cmd_part, int max) { + char* token = strtok(str, delims); + char part_index = 0; + char** tmp_part = p_p_cmd_part; + while (token) { + *tmp_part++ = token; + token = strtok(NULL, delims); + part_index++; + if (part_index >= max) break; + } + return part_index; +} int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { - mqttPrint("Message arrived\n"); - mqttPrint(" topic: %s\n", topicName); - mqttPrint(" message: %.*s\n", message->payloadlen, (char*)message->payload); + mqttTrace("Message arrived,topic is %s,message is %.*s", topicName, message->payloadlen, (char*)message->payload); + char _token[128] = {0}; + char _dbname[128] = {0}; + char _tablename[128] = {0}; if (mqtt_conn == NULL) { + mqttPrint("connect database"); taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); } - if (mqtt_conn != NULL) { - taos_query_a(mqtt_conn, (char*)message->payload, mqtt_query_insert_callback, &client); + if (strncmp(topicName, "/taos/", 6) == 0) { + char* p_p_cmd_part[5] = {0}; + char copystr[1024] = {0}; + strncpy(copystr, topicName, MIN(1024, strlen(topicName))); + char part_index = split(copystr, "/", p_p_cmd_part, 10); + if (part_index < 4) { + mqttError("The topic %s is't format '%s'.", topicName, TOPIC); + } else { + strncpy(_token, p_p_cmd_part[1], 127); + strncpy(_dbname, p_p_cmd_part[2], 127); + strncpy(_tablename, p_p_cmd_part[3], 127); + mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, + _tablename); + cJSON* jPlayload = cJSON_Parse((char*)message->payload); + char _names[102400] = {0}; + char _values[102400] = {0}; + int i = 0; + int count = cJSON_GetArraySize(jPlayload); + for (; i < count; i++) //遍历最外层json键值对 + { + cJSON* item = cJSON_GetArrayItem(jPlayload, i); + if (cJSON_Object == item->type) { + mqttPrint("The item '%s' is not supported", item->string); + } else { + strcat(_names, item->string); + if (i < count - 1) { + strcat(_names, ","); + } + char* __value_json = cJSON_Print(item); + strcat(_values, __value_json); + free(__value_json); + if (i < count - 1) { + strcat(_values, ","); + } + } + } + cJSON_free(jPlayload); + char _sql[102400] = {0}; + sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); + + if (mqtt_conn != NULL) { + mqttPrint("query:%s", _sql); + taos_query_a(mqtt_conn, _sql, mqtt_query_insert_callback, &client); + } + } } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } - void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code) { +void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code) { if (code < 0) { - mqttError("mqtt:%p, save data failed, code:%s", mqtt_conn, tstrerror(code)); + mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code)); } else if (code == 0) { - mqttError("mqtt:%p, save data failed, affect rows:%d", mqtt_conn, code); + mqttError("mqtt:%d, save data failed, affect rows:%d", code, code); } else { - mqttError("mqtt:%p, save data success, code:%s", mqtt_conn, tstrerror(code)); + mqttPrint("mqtt:%d, save data success, code:%s", code, tstrerror(code)); } } @@ -89,7 +148,7 @@ void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { void onDisconnect(void* context, MQTTAsync_successData* response) { mqttError("Successful disconnection\n"); if (mqtt_conn != NULL) { - taos_close(&(mqtt_conn)); + taos_close(mqtt_conn); mqtt_conn = NULL; } disc_finished = 1; @@ -98,16 +157,11 @@ void onDisconnect(void* context, MQTTAsync_successData* response) { void onSubscribe(void* context, MQTTAsync_successData* response) { mqttPrint("Subscribe succeeded\n"); subscribed = 1; - } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { mqttError("Subscribe failed, rc %d\n", response->code); finished = 1; - if (mqtt_conn != NULL) { - taos_close(mqtt_conn); - mqtt_conn = NULL; - } } void onConnectFailure(void* context, MQTTAsync_failureData* response) { @@ -122,7 +176,7 @@ void onConnect(void* context, MQTTAsync_successData* response) { mqttPrint("Successful connection\n"); - mqttPrint("Subscribing to topic %s\nfor client %s using QoS%d\n\n",TOPIC, CLIENTID, QOS); + mqttPrint("Subscribing to topic %s\nfor client %s using QoS%d", TOPIC, CLIENTID, QOS); opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; @@ -132,26 +186,29 @@ void onConnect(void* context, MQTTAsync_successData* response) { } } - - -int32_t mqttGetReqCount() { return 0; } int32_t mqttInitSystem() { int rc = 0; - if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to create client, return code %d\n", rc); + if (strnlen(tsMqttBrokerAddress, 128) == 0) { rc = EXIT_FAILURE; - + mqttError("Can't to create client, mqtt broker address is empty %d", rc); } else { - if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to set callbacks, return code %d\n", rc); + if ((rc = MQTTAsync_create(&client, tsMqttBrokerAddress, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != + MQTTASYNC_SUCCESS) { + mqttError("Failed to create client, return code %d", rc); rc = EXIT_FAILURE; + } else { - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - conn_opts.onSuccess = onConnect; - conn_opts.onFailure = onConnectFailure; - conn_opts.context = client; - taos_init(); + if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { + mqttError("Failed to set callbacks, return code %d", rc); + rc = EXIT_FAILURE; + } else { + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + taos_init(); + } } } return rc; @@ -161,34 +218,31 @@ int32_t mqttStartSystem() { int rc = 0; mqttPrint("mqttStartSystem"); if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start connect, return code %d\n", rc); + mqttError("Failed to start connect, return code %d", rc); rc = EXIT_FAILURE; } else { while (!subscribed && !finished) usleep(10000L); disc_opts.onSuccess = onDisconnect; disc_opts.onFailure = onDisconnectFailure; mqttPrint("Successful started\n"); - } return rc; } - - void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { +void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { if (code < 0) { - mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); + mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); taos_close(mqtt_conn); mqtt_conn = NULL; return; } mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); - } - +} void mqttStopSystem() { - int rc=0; + int rc = 0; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start disconnect, return code %d\n", rc); + mqttError("Failed to start disconnect, return code %d", rc); rc = EXIT_FAILURE; } else { while (!disc_finished) {