From afdab2bd7d983f8fdb049c6dd61905e3026e4d82 Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Sat, 23 May 2020 01:55:47 +0800 Subject: [PATCH] Add mqtt plugin to subscribe to telemetry data --- .gitignore | 24 +++++++++++++++++ CMakeSettings.json | 25 ++++++++++++++++++ src/common/inc/tglobal.h | 2 ++ src/common/src/tglobal.c | 25 ++++++++++++++++++ src/dnode/CMakeLists.txt | 2 +- src/dnode/src/dnodeModule.c | 11 ++++++++ src/inc/mqtt.h | 35 +++++++++++++++++++++++++ src/inc/taosdef.h | 1 + src/plugins/CMakeLists.txt | 1 + src/plugins/mqtt/CMakeLists.txt | 22 ++++++++++++++++ src/plugins/mqtt/inc/mqttLog.h | 42 ++++++++++++++++++++++++++++++ src/plugins/mqtt/inc/mqttSystem.h | 34 ++++++++++++++++++++++++ src/plugins/mqtt/src/mqttSystem.c | 43 +++++++++++++++++++++++++++++++ 13 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 CMakeSettings.json create mode 100644 src/inc/mqtt.h create mode 100644 src/plugins/mqtt/CMakeLists.txt create mode 100644 src/plugins/mqtt/inc/mqttLog.h create mode 100644 src/plugins/mqtt/inc/mqttSystem.h create mode 100644 src/plugins/mqtt/src/mqttSystem.c diff --git a/.gitignore b/.gitignore index 47eae4dc03..67cc2929b4 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,27 @@ pysim/ # Doxygen Generated files html/ +/.vs +/CMakeFiles/3.10.2 +/CMakeCache.txt +/Makefile +/*.cmake +/deps +/src/cq/test/CMakeFiles/cqtest.dir/*.cmake +*.cmake +/src/cq/test/CMakeFiles/cqtest.dir/*.make +*.make +link.txt +*.internal +*.includecache +*.marks +Makefile +CMakeError.log +*.log +/CMakeFiles/CMakeRuleHashes.txt +/CMakeFiles/Makefile2 +/CMakeFiles/TargetDirectories.txt +/CMakeFiles/cmake.check_cache +/out/isenseconfig/WSL-Clang-Debug +/out/isenseconfig/WSL-GCC-Debug +/test/cfg diff --git a/CMakeSettings.json b/CMakeSettings.json new file mode 100644 index 0000000000..4b54f10f2f --- /dev/null +++ b/CMakeSettings.json @@ -0,0 +1,25 @@ +{ + "configurations": [ + { + "name": "WSL-GCC-Debug", + "generator": "Unix Makefiles", + "configurationType": "Debug", + "buildRoot": "${projectDir}\\build\\", + "installRoot": "${projectDir}\\out\\install\\${name}", + "cmakeExecutable": "/usr/bin/cmake", + "cmakeCommandArgs": "", + "buildCommandArgs": "", + "ctestCommandArgs": "", + "inheritEnvironments": [ "linux_x64" ], + "wslPath": "${defaultWSLPath}", + "addressSanitizerRuntimeFlags": "detect_leaks=0", + "variables": [ + { + "name": "CMAKE_INSTALL_PREFIX", + "value": "/mnt/d/TDengine/TDengine/build", + "type": "PATH" + } + ] + } + ] +} \ No newline at end of file diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index da8f3cd1e1..319772b606 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -103,6 +103,7 @@ extern int32_t tsOfflineThreshold; extern int32_t tsMgmtEqualVnodeNum; extern int32_t tsEnableHttpModule; +extern int32_t tsEnableMqttModule; extern int32_t tsEnableMonitorModule; extern int32_t tsRestRowLimit; @@ -147,6 +148,7 @@ extern int32_t jniDebugFlag; extern int32_t tmrDebugFlag; extern int32_t sdbDebugFlag; extern int32_t httpDebugFlag; +extern int32_t mqttDebugFlag; extern int32_t monitorDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 6b348b7fc7..445cf4d52b 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -120,6 +120,7 @@ int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsMgmtEqualVnodeNum = 4; int32_t tsEnableHttpModule = 1; +int32_t tsEnableMqttModule = 1; int32_t tsEnableMonitorModule = 0; int32_t tsRestRowLimit = 10240; @@ -134,6 +135,7 @@ int32_t cDebugFlag = 135; int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; int32_t httpDebugFlag = 131; +int32_t mqttDebugFlag = 131; int32_t monitorDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 135; @@ -212,6 +214,7 @@ void taosSetAllDebugFlag() { jniDebugFlag = debugFlag; odbcDebugFlag = debugFlag; httpDebugFlag = debugFlag; + mqttDebugFlag = debugFlag; monitorDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; @@ -890,6 +893,17 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + + cfg.option = "mqtt"; + cfg.ptr = &tsEnableMqttModule; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "monitor"; cfg.ptr = &tsEnableMonitorModule; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1112,6 +1126,17 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "mqttDebugFlag"; + cfg.ptr = &mqttDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "monitorDebugFlag"; cfg.ptr = &monitorDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index af2dc2d777..de6e15e6b9 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode cJson lz4) + TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index f5c28c9573..2f3008c33e 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -20,6 +20,7 @@ #include "trpc.h" #include "mnode.h" #include "http.h" +#include "mqtt.h" #include "monitor.h" #include "dnodeInt.h" #include "dnodeModule.h" @@ -62,6 +63,16 @@ static void dnodeAllocModules() { dnodeSetModuleStatus(TSDB_MOD_HTTP); } + tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1); + tsModule[TSDB_MOD_MQTT].name = "mqtt"; + tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem; + tsModule[TSDB_MOD_MQTT].cleanUpFp = mqttCleanUpSystem; + tsModule[TSDB_MOD_MQTT].startFp = mqttStartSystem; + tsModule[TSDB_MOD_MQTT].stopFp = mqttStopSystem; + if (tsEnableMqttModule) { + dnodeSetModuleStatus(TSDB_MOD_MQTT); + } + tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1); tsModule[TSDB_MOD_MONITOR].name = "monitor"; tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; diff --git a/src/inc/mqtt.h b/src/inc/mqtt.h new file mode 100644 index 0000000000..710737e79a --- /dev/null +++ b/src/inc/mqtt.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_H +#define TDENGINE_MQTT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +int32_t mqttGetReqCount(); +int32_t mqttInitSystem(); +int32_t mqttStartSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index b6a37d85a2..6914961463 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -376,6 +376,7 @@ typedef enum { TSDB_MOD_MGMT, TSDB_MOD_HTTP, TSDB_MOD_MONITOR, + TSDB_MOD_MQTT, TSDB_MOD_MAX } EModuleType; diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt index 562de72410..2bc6bf54bf 100644 --- a/src/plugins/CMakeLists.txt +++ b/src/plugins/CMakeLists.txt @@ -3,3 +3,4 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(monitor) ADD_SUBDIRECTORY(http) +ADD_SUBDIRECTORY(mqtt) diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt new file mode 100644 index 0000000000..6224a0cad4 --- /dev/null +++ b/src/plugins/mqtt/CMakeLists.txt @@ -0,0 +1,22 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(src SRC) + ADD_LIBRARY(mqtt ${SRC}) + TARGET_LINK_LIBRARIES(mqtt taos_static z) + + IF (TD_ADMIN) + TARGET_LINK_LIBRARIES(mqtt admin) + ENDIF () +ENDIF () diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h new file mode 100644 index 0000000000..735678a326 --- /dev/null +++ b/src/plugins/mqtt/inc/mqttLog.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_LOG_H +#define TDENGINE_MQTT_LOG_H + +#include "tlog.h" + +extern int32_t mqttDebugFlag; + +#define mqttError(...) \ + if (mqttDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("ERROR MQT ", 255, __VA_ARGS__); \ + } +#define mqttWarn(...) \ + if ( mqttDebugFlag & DEBUG_WARN) { \ + taosPrintLog("WARN MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttTrace(...) \ + if ( mqttDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttDump(...) \ + if ( mqttDebugFlag & DEBUG_TRACE) { \ + taosPrintLongString("MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttPrint(...) \ + { taosPrintLog("MQT ", 255, __VA_ARGS__); } + +#endif diff --git a/src/plugins/mqtt/inc/mqttSystem.h b/src/plugins/mqtt/inc/mqttSystem.h new file mode 100644 index 0000000000..c61318806d --- /dev/null +++ b/src/plugins/mqtt/inc/mqttSystem.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_SYSTEM_H +#define TDENGINE_MQTT_SYSTEM_H +#ifdef __cplusplus +extern "C" { +#endif + +#include + +int32_t mqttGetReqCount(); +int32_t mqttInitSystem(); +int32_t mqttStartSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c new file mode 100644 index 0000000000..ccf6cfb3e3 --- /dev/null +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mqttSystem.h" +#include "mqtt.h" +#include "mqttLog.h" +#include "os.h" +#include "taos.h" +#include "tglobal.h" +#include "tsocket.h" +#include "ttimer.h" + +int32_t mqttGetReqCount() { return 0; } +int mqttInitSystem() { + mqttPrint("mqttInitSystem"); + return 0; +} + +int mqttStartSystem() { + mqttPrint("mqttStartSystem"); + return 0; +} + +void mqttStopSystem() { + mqttPrint("mqttStopSystem"); +} + +void mqttCleanUpSystem() { + mqttPrint("mqttCleanUpSystem"); +} -- GitLab