diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt deleted file mode 100644 index 90d91e8bcbcb0cd26ba0a472469aed48b6049e39..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/CMakeLists.txt +++ /dev/null @@ -1,32 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20) -PROJECT(TDengine) - -INCLUDE_DIRECTORIES(inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates) -AUX_SOURCE_DIRECTORY(src SRC) - -IF (TD_LINUX) - ADD_LIBRARY(mqtt ${SRC}) - TARGET_LINK_LIBRARIES(mqtt cJson mqttc) - - IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(mqtt taos_static) - ELSE () - TARGET_LINK_LIBRARIES(mqtt taos) - ENDIF () -ENDIF () - -IF (TD_DARWIN) - ADD_LIBRARY(mqtt ${SRC}) - TARGET_LINK_LIBRARIES(mqtt cJson mqttc) - - IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(mqtt taos_static) - ELSE () - TARGET_LINK_LIBRARIES(mqtt taos) - ENDIF () -ENDIF () - diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h deleted file mode 100644 index 81a9a39a2ce04c66934c4bd25ed67bdb51c77ba3..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/inc/mqttInit.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MQTT_INIT_H -#define TDENGINE_MQTT_INIT_H -#ifdef __cplusplus -extern "C" { -#endif - -/** - * @file - * A simple subscriber program that performs automatic reconnections. - */ -#include "mqtt.h" - -#define QOS 1 -#define TIMEOUT 10000L -#define MQTT_SEND_BUF_SIZE 102400 -#define MQTT_RECV_BUF_SIZE 102400 - -/** - * @brief A structure that I will use to keep track of some data needed - * to setup the connection to the broker. - * - * An instance of this struct will be created in my \c main(). Then, whenever - * \ref mqttReconnectClient is called, this instance will be passed. - */ -typedef struct SMqttReconnectState { - uint8_t* sendbuf; - size_t sendbufsz; - uint8_t* recvbuf; - size_t recvbufsz; -} SMqttReconnectState; - -/** - * @brief My reconnect callback. It will reestablish the connection whenever - * an error occurs. - */ -void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr); - -/** - * @brief The function will be called whenever a PUBLISH message is received. - */ -void mqttPublishCallback(void** unused, struct mqtt_response_publish* published); - -/** - * @brief The client's refresher. This function triggers back-end routines to - * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that - * client ingress/egress traffic will be handled every 100 ms. - */ -void* mqttClientRefresher(void* client); - -/** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. - */ -void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h deleted file mode 100644 index e186b8111220bd83f2f6833d84d1fdc19f744520..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/inc/mqttLog.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MQTT_LOG_H -#define TDENGINE_MQTT_LOG_H - -#include "tlog.h" - -extern int32_t mqttDebugFlag; - -#define mqttFatal(...) { if (mqttDebugFlag & DEBUG_FATAL) { taosPrintLog("MQT FATAL ", 255, __VA_ARGS__); }} -#define mqttError(...) { if (mqttDebugFlag & DEBUG_ERROR) { taosPrintLog("MQT ERROR ", 255, __VA_ARGS__); }} -#define mqttWarn(...) { if (mqttDebugFlag & DEBUG_WARN) { taosPrintLog("MQT WARN ", 255, __VA_ARGS__); }} -#define mqttInfo(...) { if (mqttDebugFlag & DEBUG_INFO) { taosPrintLog("MQT ", 255, __VA_ARGS__); }} -#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); }} -#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); }} - -#endif diff --git a/src/plugins/mqtt/inc/mqttPayload.h b/src/plugins/mqtt/inc/mqttPayload.h deleted file mode 100644 index 12a714afac52d1814f3cbf82ade610ec8af07953..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/inc/mqttPayload.h +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MQTT_PLYLOAD_H -#define TDENGINE_MQTT_PLYLOAD_H - -#ifdef __cplusplus -extern "C" { -#endif - -char* mqttConverJsonToSql(char* json, int maxSize); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c deleted file mode 100644 index 1af8b02fad7492da5664628b534b890732acbe3a..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/src/mqttPayload.c +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "cJSON.h" -#include "mqttLog.h" -#include "mqttPayload.h" - -// subscribe message like this - -/* -/test { - "timestamp": 1599121290, - "gateway": { - "name": "AcuLink 810 Gateway", - "model": "AcuLink810-868", - "serial": "S8P20200207" - }, - "device": { - "name": "Acuvim L V3 .221", - "model": "Acuvim-L-V3", - "serial": "221", - "online": true, - "readings": [ - { - "param": "Freq_Hz", - "value": "59.977539", - "unit": "Hz" - }, - { - "param": "Va_V", - "value": "122.002907", - "unit": "V" - }, - { - "param": "DI4", - "value": "5.000000", - "unit": "" - } - ] - } -} -*/ - -// send msg cmd -// mosquitto_pub -h test.mosquitto.org -t "/test" -m '{"timestamp": 1599121290,"gateway": {"name": "AcuLink 810 Gateway","model": "AcuLink810-868","serial": "S8P20200207"},"device": {"name": "Acuvim L V3 .221","model": "Acuvim-L-V3","serial": "221","online": true,"readings": [{"param": "Freq_Hz","value": "59.977539","unit": "Hz"},{"param": "Va_V","value": "122.002907","unit": "V"},{"param": "DI4","value": "5.000000","unit": ""}]}}' - -/* - * This is an example, this function needs to be implemented in order to parse the json file into a sql statement - * Note that you need to create a super table and database before writing data - * In this case: - * create database mqttdb; - * create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); - */ - -char* mqttConverJsonToSql(char* json, int maxSize) { - // const int32_t maxSize = 10240; - maxSize *= 5; - char* sql = malloc(maxSize); - - cJSON* root = cJSON_Parse(json); - if (root == NULL) { - mqttError("failed to parse msg, invalid json format"); - goto MQTT_PARSE_OVER; - } - - cJSON* timestamp = cJSON_GetObjectItem(root, "timestamp"); - if (!timestamp || timestamp->type != cJSON_Number) { - mqttError("failed to parse msg, timestamp not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* device = cJSON_GetObjectItem(root, "device"); - if (!device) { - mqttError("failed to parse msg, device not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* name = cJSON_GetObjectItem(device, "name"); - if (!name || name->type != cJSON_String) { - mqttError("failed to parse msg, name not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* model = cJSON_GetObjectItem(device, "model"); - if (!model || model->type != cJSON_String) { - mqttError("failed to parse msg, model not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* serial = cJSON_GetObjectItem(device, "serial"); - if (!serial || serial->type != cJSON_String) { - mqttError("failed to parse msg, serial not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* readings = cJSON_GetObjectItem(device, "readings"); - if (!readings || readings->type != cJSON_Array) { - mqttError("failed to parse msg, readings not found"); - goto MQTT_PARSE_OVER; - } - - int count = cJSON_GetArraySize(readings); - if (count <= 0) { - mqttError("failed to parse msg, readings size smaller than 0"); - goto MQTT_PARSE_OVER; - } - - int len = snprintf(sql, maxSize, "insert into"); - - for (int i = 0; i < count; ++i) { - cJSON* reading = cJSON_GetArrayItem(readings, i); - if (reading == NULL) continue; - - cJSON* param = cJSON_GetObjectItem(reading, "param"); - if (!param || param->type != cJSON_String) { - mqttError("failed to parse msg, param not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* value = cJSON_GetObjectItem(reading, "value"); - if (!value || value->type != cJSON_String) { - mqttError("failed to parse msg, value not found"); - goto MQTT_PARSE_OVER; - } - - cJSON* unit = cJSON_GetObjectItem(reading, "unit"); - if (!unit || unit->type != cJSON_String) { - mqttError("failed to parse msg, unit not found"); - goto MQTT_PARSE_OVER; - } - - len += snprintf(sql + len, maxSize - len, - " mqttdb.serial_%s_%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", - serial->valuestring, param->valuestring, name->valuestring, model->valuestring, serial->valuestring, - param->valuestring, unit->valuestring, timestamp->valueint * 1000, value->valuestring); - } - - cJSON_free(root); - return sql; - -MQTT_PARSE_OVER: - cJSON_free(root); - free(sql); - return NULL; -} \ No newline at end of file diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c deleted file mode 100644 index e0f2f393bb10706daadca35715d7e0ae68d7c436..0000000000000000000000000000000000000000 --- a/src/plugins/mqtt/src/mqttSystem.c +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "mqtt.h" -#include "mqttInit.h" -#include "mqttLog.h" -#include "mqttPayload.h" -#include "tmqtt.h" -#include "posix_sockets.h" -#include "taos.h" -#include "tglobal.h" -#include "taoserror.h" - -struct SMqttReconnectState tsMqttStatus = {0}; -struct mqtt_client tsMqttClient = {0}; -static pthread_t tsMqttClientDaemonThread = {0}; -static void* tsMqttConnect = NULL; -static bool tsMqttIsRuning = false; - -int32_t mqttInitSystem() { return 0; } - -int32_t mqttStartSystem() { - tsMqttStatus.sendbufsz = MQTT_SEND_BUF_SIZE; - tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE; - tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE); - tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE); - tsMqttIsRuning = true; - - mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback); - if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) { - mqttError("mqtt failed to start daemon."); - mqttCleanupRes(EXIT_FAILURE, -1, NULL); - return -1; - } - - mqttInfo("mqtt listening for topic:%s messages", tsMqttTopic); - return 0; -} - -void mqttStopSystem() { - if (tsMqttIsRuning) { - tsMqttIsRuning = false; - tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR; - - taosMsleep(300); - mqttCleanupRes(EXIT_SUCCESS, tsMqttClient.socketfd, &tsMqttClientDaemonThread); - - mqttInfo("mqtt is stopped"); - } -} - -void mqttCleanUpSystem() { - mqttStopSystem(); - mqttInfo("mqtt is cleaned up"); -} - -void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) { - const char* content = published->application_message; - mqttDebug("receive mqtt message, size:%d", (int)published->application_message_size); - - if (tsMqttConnect == NULL) { - tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); - if (tsMqttConnect == NULL) { - mqttError("failed to connect to tdengine, reason:%s", tstrerror(terrno)); - return; - } else { - mqttInfo("successfully connected to the tdengine"); - } - } - - mqttTrace("receive mqtt message, content:%s", content); - - char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size); - if (sql != NULL) { - void* res = taos_query(tsMqttConnect, sql); - int code = taos_errno(res); - if (code != 0) { - mqttError("failed to exec sql, reason:%s sql:%s", tstrerror(code), sql); - } else { - mqttTrace("successfully to exec sql:%s", sql); - } - taos_free_result(res); - } else { - mqttError("failed to parse mqtt message"); - } -} - -void* mqttClientRefresher(void* client) { - setThreadName("mqttCliRefresh"); - - while (tsMqttIsRuning) { - mqtt_sync((struct mqtt_client*)client); - taosMsleep(100); - } - - mqttDebug("mqtt quit refresher"); - return NULL; -} - -void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) { - mqttInfo("clean up mqtt module"); - if (sockfd != -1) { - close(sockfd); - } - - if (client_daemon != NULL) { - pthread_cancel(*client_daemon); - } -} - -void mqttReconnectClient(struct mqtt_client* client, void** unused) { - mqttInfo("mqtt tries to connect to the mqtt server"); - - if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { - close(client->socketfd); - } - - if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { - mqttError("mqtt client was in error state %s", mqtt_error_str(client->error)); - } - - int sockfd = open_nb_socket(tsMqttHostName, tsMqttPort); - if (sockfd < 0) { - mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort); - //mqttCleanupRes(EXIT_FAILURE, sockfd, NULL); - return; - } - - mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); - mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); - mqtt_subscribe(client, tsMqttTopic, 0); -}