diff --git a/.gitignore b/.gitignore index 67cc2929b45049b7bb7ccf377b02bdaad70c3315..178450e1240b50c6079415b1926e1bd0f3ff2bc6 100644 --- a/.gitignore +++ b/.gitignore @@ -46,7 +46,6 @@ html/ /CMakeCache.txt /Makefile /*.cmake -/deps /src/cq/test/CMakeFiles/cqtest.dir/*.cmake *.cmake /src/cq/test/CMakeFiles/cqtest.dir/*.make diff --git a/.gitmodules b/.gitmodules index 35aea3658b464000a4132a6b4a83df286d0f4933..8e91be866de8309d0b291e9cb009221f06ac02f8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,7 @@ [submodule "src/connector/go"] path = src/connector/go url = https://github.com/taosdata/driver-go +[submodule "deps/MQTT-C"] + path = deps/MQTT-C + url = https://github.com/LiamBindle/MQTT-C.git + branch = master diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index de4ff4ef249786bc071a417e3e34e5517df9fe25..3794a0a228ce9f5643159f7cd04d818af2b06e9f 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -7,4 +7,4 @@ ADD_SUBDIRECTORY(regex) ADD_SUBDIRECTORY(iconv) ADD_SUBDIRECTORY(lz4) ADD_SUBDIRECTORY(cJson) -ADD_SUBDIRECTORY(paho.mqtt.c) +ADD_SUBDIRECTORY(MQTT-C) diff --git a/deps/MQTT-C b/deps/MQTT-C new file mode 160000 index 0000000000000000000000000000000000000000..79c1b887856332fc12278e52c29532e9ebad2a8a --- /dev/null +++ b/deps/MQTT-C @@ -0,0 +1 @@ +Subproject commit 79c1b887856332fc12278e52c29532e9ebad2a8a diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 2f3008c33e52b2c2265fd80d786e1312705468d7..bfb9a77aa76a2e463df811fbe3ecdd5f60a2f661 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -20,7 +20,7 @@ #include "trpc.h" #include "mnode.h" #include "http.h" -#include "mqtt.h" +#include "tmqtt.h" #include "monitor.h" #include "dnodeInt.h" #include "dnodeModule.h" diff --git a/src/inc/mqtt.h b/src/inc/tmqtt.h similarity index 100% rename from src/inc/mqtt.h rename to src/inc/tmqtt.h diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt index 3c1dfa7625832db5246e018908b2039109a33269..cc88bd95a45cf0f09ac8024f256685837335935d 100644 --- a/src/plugins/mqtt/CMakeLists.txt +++ b/src/plugins/mqtt/CMakeLists.txt @@ -8,14 +8,15 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/paho.mqtt.c/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(mqtt ${SRC}) - TARGET_LINK_LIBRARIES(mqtt pahomqtt taos_static cJson) + TARGET_LINK_LIBRARIES(mqtt taos_static cJson mqttc) IF (TD_ADMIN) - TARGET_LINK_LIBRARIES(mqtt pahomqtt admin cJson) + TARGET_LINK_LIBRARIES(mqtt admin cJson) ENDIF () ENDIF () diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index a183cd2f5fc2d04ae9aea677e3fcd904aeac36ab..828ff343e44a802b8b9ebdd7ae45e38d109ff0a8 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -19,25 +19,62 @@ extern "C" { #endif -#include -#include "MQTTAsync.h" -#include "os.h" + +/** + * @file + * A simple subscriber program that performs automatic reconnections. + */ +#include +#include +#include +#include "mqtt.h" #include "taos.h" -#include "tglobal.h" -#include "tsocket.h" -#include "ttimer.h" -#include "tsclient.h" -char split(char str[], char delims[], char** p_p_cmd_part, int max); -void mqttConnnectLost(void* context, char* cause); -int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message); -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); -void onDisconnectFailure(void* context, MQTTAsync_failureData* response); -void onDisconnect(void* context, MQTTAsync_successData* response); -void onSubscribe(void* context, MQTTAsync_successData* response); -void onSubscribeFailure(void* context, MQTTAsync_failureData* response); -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); +/** + * @brief A structure that I will use to keep track of some data needed + * to setup the connection to the broker. + * + * An instance of this struct will be created in my \c main(). Then, whenever + * \ref mqttReconnectClient is called, this instance will be passed. + */ +struct reconnect_state_t { + const char* hostname; + const char* port; + const char* topic; + uint8_t* sendbuf; + size_t sendbufsz; + uint8_t* recvbuf; + size_t recvbufsz; +}; + +/** + * @brief My reconnect callback. It will reestablish the connection whenever + * an error occurs. + */ +void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr); + +/** + * @brief The function will be called whenever a PUBLISH message is received. + */ +void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published); + +/** + * @brief The client's refresher. This function triggers back-end routines to + * handle ingress/egress traffic to the broker. + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * client ingress/egress traffic will be handled every 100 ms. + */ +void* mqttClientRefresher(void* client); + +/** + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + */ +void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); +void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); +void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); #define CLIENTID "taos" #define TOPIC "/taos/+/+/+/" // taos//// #define PAYLOAD "Hello World!" diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c index b8f5ec3135ae0a348268392cbc4ae3c6a5edcbed..5f1d792dab92c8296e20923f99fb56f67e5b39ed 100644 --- a/src/plugins/mqtt/src/mqttPayload.c +++ b/src/plugins/mqtt/src/mqttPayload.c @@ -39,7 +39,7 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) { char _values[102400] = {0}; int i = 0; int count = cJSON_GetArraySize(jPlayload); - for (; i < count; i++) //遍历最外层json键值对 + for (; i < count; i++) { cJSON* item = cJSON_GetArrayItem(jPlayload, i); if (cJSON_Object == item->type) { @@ -58,7 +58,8 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) { } } cJSON_free(jPlayload); - char* _sql = calloc(0, strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024); + int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024; + char* _sql = calloc(1, sqllen); sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); return _sql; } \ No newline at end of file diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 97fbcfba80349320996001fb205007ba8eb2b0b0..10ca73dc7c0dd8537b21b305cf33bf99825d2484 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -14,50 +14,78 @@ */ #define _DEFAULT_SOURCE -#include "mqttSystem.h" -#include "MQTTAsync.h" + #include "cJSON.h" #include "mqtt.h" +#include "mqttInit.h" #include "mqttLog.h" +#include "mqttPayload.h" #include "os.h" +#include "posix_sockets.h" #include "string.h" #include "taos.h" #include "tglobal.h" +#include "tmqtt.h" #include "tsclient.h" #include "tsocket.h" #include "ttimer.h" -#include "mqttInit.h" -#include "mqttPayload.h" +#include "mqttSystem.h" +struct mqtt_client client; +pthread_t client_daemon; +void* mqtt_conn; +struct reconnect_state_t reconnect_state; + +int32_t mqttInitSystem() { + int rc = 0; + const char* addr; + const char* port; + addr = tsMqttBrokerAddress; + port = "1883"; + reconnect_state.hostname = addr; + reconnect_state.port = port; + reconnect_state.topic = TOPIC; + uint8_t sendbuf[2048]; + uint8_t recvbuf[1024]; + reconnect_state.sendbuf = sendbuf; + reconnect_state.sendbufsz = sizeof(sendbuf); + reconnect_state.recvbuf = recvbuf; + reconnect_state.recvbufsz = sizeof(recvbuf); + taos_init(); + mqttPrint("mqttInitSystem %s", tsMqttBrokerAddress); + return rc; +} -MQTTAsync client; -MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; -MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; -void* mqtt_conn = NULL; -int disc_finished = 0; -int subscribed = 0; -int finished = 0; -int can_exit = 0; - -void mqttConnnectLost(void* context, char* cause) { - MQTTAsync client = (MQTTAsync)context; - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - int rc; - - mqttError("\nConnection lost"); - if (cause) mqttError(" cause: %s", cause); - - mqttPrint("Reconnecting"); - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start connect, return code %d", rc); - finished = 1; +int32_t mqttStartSystem() { + int rc = 0; + mqtt_conn = NULL; + mqtt_init_reconnect(&client, mqttReconnectClient, &reconnect_state, mqtt_PublishCallback); + if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) { + mqttError("Failed to start client daemon."); + mqttCleanup(EXIT_FAILURE, -1, NULL); + rc = -1; } + mqttPrint("listening for '%s' messages.", TOPIC); + return rc; +} + +void mqttStopSystem() { + mqttError("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\""); + client.error = MQTT_ERROR_SOCKET_ERROR; } +void mqttCleanUpSystem() { + mqttPrint("mqttCleanUpSystem"); + mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon); + taos_cleanup(mqtt_conn); +} -int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { - mqttTrace("Message arrived,topic is %s,message is %.*s", topicName, message->payloadlen, (char*)message->payload); +void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { + mqttPrint("mqtt_PublishCallback"); + /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ + char* topic_name = (char*)malloc(published->topic_name_size + 1); + memcpy(topic_name, published->topic_name, published->topic_name_size); + topic_name[published->topic_name_size] = '\0'; + mqttPrint("Received publish('%s'): %s", topic_name, (const char*)published->application_message); char _token[128] = {0}; char _dbname[128] = {0}; char _tablename[128] = {0}; @@ -65,162 +93,98 @@ int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_m mqttPrint("connect database"); taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); } - if (strncmp(topicName, "/taos/", 6) == 0) { + if (strncmp(topic_name, "/taos/", 6) == 0) { char* p_p_cmd_part[5] = {0}; char copystr[1024] = {0}; - strncpy(copystr, topicName, MIN(1024, strlen(topicName))); + strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); char part_index = split(copystr, "/", p_p_cmd_part, 10); if (part_index < 4) { - mqttError("The topic %s is't format '%s'.", topicName, TOPIC); + mqttError("The topic %s is't format '%s'.", topic_name, TOPIC); } else { strncpy(_token, p_p_cmd_part[1], 127); strncpy(_dbname, p_p_cmd_part[2], 127); strncpy(_tablename, p_p_cmd_part[3], 127); mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, _tablename); - + if (mqtt_conn != NULL) { - char* _sql = converJsonToSql((char*)message->payload, _dbname, _tablename); + char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); mqttPrint("query:%s", _sql); taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client); + mqttPrint("free sql:%s", _sql); free(_sql); } } } - MQTTAsync_freeMessage(&message); - MQTTAsync_free(topicName); - return 1; + free(topic_name); } -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code)); - } else if (code == 0) { - mqttError("mqtt:%d, save data failed, affect rows:%d", code, code); - } else { - mqttPrint("mqtt:%d, save data success, code:%s", code, tstrerror(code)); + +void* mqttClientRefresher(void* client) { + while (1) { + mqtt_sync((struct mqtt_client*)client); + usleep(100000U); } + return NULL; } -void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Disconnect failed, rc %d", response->code); - disc_finished = 1; +void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) { + mqttPrint("mqttCleanup"); + if (sockfd != -1) close(sockfd); + if (client_daemon != NULL) pthread_cancel(*client_daemon); } -void onDisconnect(void* context, MQTTAsync_successData* response) { - mqttError("Successful disconnection"); - if (mqtt_conn != NULL) { +void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { + if (code < 0) { + mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); taos_close(mqtt_conn); mqtt_conn = NULL; + return; } - disc_finished = 1; -} - -void onSubscribe(void* context, MQTTAsync_successData* response) { - mqttPrint("Subscribe succeeded"); - subscribed = 1; -} - -void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Subscribe failed, rc %d", response->code); - finished = 1; + mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); } -void onConnectFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Connect failed, rc %d,,Retry later", response->code); - finished = 1; - taosMsleep(1000); - int rc = 0; - if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start connect, return code %d", rc); - finished = 1; +void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { + if (code < 0) { + mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code)); + } else if (code == 0) { + mqttError("mqtt:%d, save data failed, affect rows:%d", code, code); + } else { + mqttPrint("mqtt:%d, save data success, code:%s", code, tstrerror(code)); } } -void onConnect(void* context, MQTTAsync_successData* response) { - MQTTAsync client = (MQTTAsync)context; - MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; - int rc; +void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) { + mqttPrint("mqttReconnectClient"); + struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr); - mqttPrint("Successful connection\n"); - - mqttPrint("Subscribing to topic %s\nfor client %s using QoS%d", TOPIC, CLIENTID, QOS); - opts.onSuccess = onSubscribe; - opts.onFailure = onSubscribeFailure; - opts.context = client; - if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start subscribe, return code %d\n", rc); - finished = 1; + /* Close the clients socket if this isn't the initial reconnect call */ + if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { + close(client->socketfd); } -} -int32_t mqttInitSystem() { - int rc = 0; - if (strnlen(tsMqttBrokerAddress, 128) == 0) { - rc = EXIT_FAILURE; - mqttError("Can't to create client, mqtt broker address is empty %d", rc); - } else { - if ((rc = MQTTAsync_create(&client, tsMqttBrokerAddress, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != - MQTTASYNC_SUCCESS) { - mqttError("Failed to create client, return code %d", rc); - rc = EXIT_FAILURE; - - } else { - if ((rc = MQTTAsync_setCallbacks(client, client, mqttConnnectLost, mqttMessageArrived, NULL)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to set callbacks, return code %d", rc); - rc = EXIT_FAILURE; - } else { - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - conn_opts.onSuccess = onConnect; - conn_opts.onFailure = onConnectFailure; - conn_opts.context = client; - taos_init(); - } - } + /* Perform error handling here. */ + if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { + mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error)); } - return rc; -} -int32_t mqttStartSystem() { - int rc = 0; - mqttPrint("mqttStartSystem"); - if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start connect, return code %d", rc); - rc = EXIT_FAILURE; - } else { - while (!subscribed && !finished) usleep(10000L); - disc_opts.onSuccess = onDisconnect; - disc_opts.onFailure = onDisconnectFailure; - mqttPrint("Successful started\n"); + /* Open a new socket. */ + int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port); + if (sockfd == -1) { + mqttError("Failed to open socket: "); + mqttCleanup(EXIT_FAILURE, sockfd, NULL); } - return rc; -} -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); - taos_close(mqtt_conn); - mqtt_conn = NULL; - return; - } - mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); -} + /* Reinitialize the client. */ + mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, + reconnect_state->recvbufsz); -void mqttStopSystem() { - int rc = 0; - if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { - mqttError("Failed to start disconnect, return code %d", rc); - rc = EXIT_FAILURE; - } else { - while (!disc_finished) { - usleep(10000L); - } - } - taos_close(mqtt_conn); -} + /* Create an anonymous session */ + const char* client_id = NULL; + /* Ensure we have a clean session */ + uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; + /* Send connection request to the broker. */ + mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400); -void mqttCleanUpSystem() { - mqttPrint("mqttCleanUpSystem"); - MQTTAsync_destroy(&client); - taos_cleanup(mqtt_conn); -} + /* Subscribe to the topic. */ + mqtt_subscribe(client, reconnect_state->topic, 0); +} \ No newline at end of file