diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 33d83228a75226ae924eb32e0a4f92133ef20e2f..aed71ed761006684f5f80ad0d29e6c0111182484 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -30,86 +30,90 @@ #include "tsocket.h" #include "ttimer.h" #include "mqttSystem.h" -struct mqtt_client client; -pthread_t client_daemon; -void* mqtt_conn; -struct reconnect_state_t recnt_status; -char* topicPath; -int isStop = 1; +struct mqtt_client mqttClient = {0}; +pthread_t clientDaemonThread = {0}; +void* mqttConnect=NULL; +struct reconnect_state_t recntStatus = {0}; +char* topicPath=NULL; +int mttIsRuning = 1; + int32_t mqttInitSystem() { int rc = 0; - - return rc; -} - -int32_t mqttStartSystem() { - int rc = 0; uint8_t sendbuf[2048]; uint8_t recvbuf[1024]; - recnt_status.sendbuf = sendbuf; - recnt_status.sendbufsz = sizeof(sendbuf); - recnt_status.recvbuf = recvbuf; - recnt_status.recvbufsz = sizeof(recvbuf); + recntStatus.sendbuf = sendbuf; + recntStatus.sendbufsz = sizeof(sendbuf); + recntStatus.recvbuf = recvbuf; + recntStatus.recvbufsz = sizeof(recvbuf); char* url = tsMqttBrokerAddress; - recnt_status.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL; - recnt_status.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recnt_status.user_name), ":", "@") : NULL; + recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL; + recntStatus.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recntStatus.user_name), ":", "@") : NULL; if (strstr(url, "@") != NULL) { - recnt_status.hostname = strbetween(url, "@", ":"); + recntStatus.hostname = strbetween(url, "@", ":"); } else if (strstr(strstr(url, "://") + 3, ":") != NULL) { - recnt_status.hostname = strbetween(url, "//", ":"); + recntStatus.hostname = strbetween(url, "//", ":"); } else { - recnt_status.hostname = strbetween(url, "//", "/"); + recntStatus.hostname = strbetween(url, "//", "/"); } - char* _begin_hostname = strstr(url, recnt_status.hostname); + char* _begin_hostname = strstr(url, recntStatus.hostname); if (strstr(_begin_hostname, ":") != NULL) { - recnt_status.port = strbetween(_begin_hostname, ":", "/"); + recntStatus.port = strbetween(_begin_hostname, ":", "/"); } else { - recnt_status.port = strbetween("'1883'", "'", "'"); + recntStatus.port = strbetween("'1883'", "'", "'"); } - topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recnt_status.port : recnt_status.hostname), + topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recntStatus.port : recntStatus.hostname), "/", "/"); char* _topic = "+/+/+/"; int _tpsize = strlen(topicPath) + strlen(_topic) + 1; - recnt_status.topic = calloc(1, _tpsize); - sprintf(recnt_status.topic, "/%s/%s", topicPath, _topic); - recnt_status.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt"; - - taos_init(); - mqttPrint("mqttInitSystem mqtt://%s:%s@%s:%s/%s/", recnt_status.user_name, recnt_status.password, - recnt_status.hostname, recnt_status.port, topicPath); - mqtt_conn = NULL; - mqtt_init_reconnect(&client, mqttReconnectClient, &recnt_status, mqtt_PublishCallback); - if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) { + recntStatus.topic = calloc(1, _tpsize); + sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic); + recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt"; + mqttConnect = NULL; + return rc; +} + +int32_t mqttStartSystem() { + int rc = 0; + if (recntStatus.user_name != NULL && recntStatus.password != NULL) { + mqttPrint("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password, + recntStatus.hostname, recntStatus.port, topicPath); + } + else if (recntStatus.user_name != NULL && recntStatus.password == NULL) + { + mqttPrint("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name,recntStatus.hostname, recntStatus.port, topicPath); + } + + mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback); + if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) { mqttError("Failed to start client daemon."); mqttCleanup(EXIT_FAILURE, -1, NULL); rc = -1; + } else { + mqttPrint("listening for '%s' messages.", recntStatus.topic); } - mqttPrint("listening for '%s' messages.", recnt_status.topic); return rc; } void mqttStopSystem() { - mqttError("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\""); - client.error = MQTT_ERROR_SOCKET_ERROR; - isStop = 0; + mqttClient.error = MQTT_ERROR_SOCKET_ERROR; + mttIsRuning = 0; usleep(300000U); + mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread); + mqttPrint("mqtt is stoped"); } void mqttCleanUpSystem() { - mqttPrint("starting to clean up mqtt"); - mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon); - taos_cleanup(mqtt_conn); - free(recnt_status.user_name); - free(recnt_status.password); - free(recnt_status.hostname); - free(recnt_status.port); - free(recnt_status.topic); + mqttPrint("starting to clean up mqtt"); + free(recntStatus.user_name); + free(recntStatus.password); + free(recntStatus.hostname); + free(recntStatus.port); + free(recntStatus.topic); free(topicPath); - mqttPrint("mqtt is cleaned up"); } @@ -123,9 +127,9 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published char _token[128] = {0}; char _dbname[128] = {0}; char _tablename[128] = {0}; - if (mqtt_conn == NULL) { + if (mqttConnect == NULL) { mqttPrint("connect database"); - taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); + taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect); } if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) { char* p_p_cmd_part[5] = {0}; @@ -141,10 +145,10 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, _tablename); - if (mqtt_conn != NULL) { + if (mqttConnect != NULL) { char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); mqttPrint("query:%s", _sql); - taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client); + taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient); mqttPrint("free sql:%s", _sql); free(_sql); } @@ -154,7 +158,7 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published } void* mqttClientRefresher(void* client) { - while (isStop) { + while (mttIsRuning) { mqtt_sync((struct mqtt_client*)client); usleep(100000U); } @@ -171,8 +175,8 @@ void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) { void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { if (code < 0) { mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); - taos_close(mqtt_conn); - mqtt_conn = NULL; + taos_close(mqttConnect); + mqttConnect = NULL; return; } mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));