提交 afdab2bd 编写于 作者: 麦壳饼's avatar 麦壳饼

Add mqtt plugin to subscribe to telemetry data

上级 9a8a0285
...@@ -41,3 +41,27 @@ pysim/ ...@@ -41,3 +41,27 @@ pysim/
# Doxygen Generated files # Doxygen Generated files
html/ html/
/.vs
/CMakeFiles/3.10.2
/CMakeCache.txt
/Makefile
/*.cmake
/deps
/src/cq/test/CMakeFiles/cqtest.dir/*.cmake
*.cmake
/src/cq/test/CMakeFiles/cqtest.dir/*.make
*.make
link.txt
*.internal
*.includecache
*.marks
Makefile
CMakeError.log
*.log
/CMakeFiles/CMakeRuleHashes.txt
/CMakeFiles/Makefile2
/CMakeFiles/TargetDirectories.txt
/CMakeFiles/cmake.check_cache
/out/isenseconfig/WSL-Clang-Debug
/out/isenseconfig/WSL-GCC-Debug
/test/cfg
{
"configurations": [
{
"name": "WSL-GCC-Debug",
"generator": "Unix Makefiles",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\build\\",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeExecutable": "/usr/bin/cmake",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
"ctestCommandArgs": "",
"inheritEnvironments": [ "linux_x64" ],
"wslPath": "${defaultWSLPath}",
"addressSanitizerRuntimeFlags": "detect_leaks=0",
"variables": [
{
"name": "CMAKE_INSTALL_PREFIX",
"value": "/mnt/d/TDengine/TDengine/build",
"type": "PATH"
}
]
}
]
}
\ No newline at end of file
...@@ -103,6 +103,7 @@ extern int32_t tsOfflineThreshold; ...@@ -103,6 +103,7 @@ extern int32_t tsOfflineThreshold;
extern int32_t tsMgmtEqualVnodeNum; extern int32_t tsMgmtEqualVnodeNum;
extern int32_t tsEnableHttpModule; extern int32_t tsEnableHttpModule;
extern int32_t tsEnableMqttModule;
extern int32_t tsEnableMonitorModule; extern int32_t tsEnableMonitorModule;
extern int32_t tsRestRowLimit; extern int32_t tsRestRowLimit;
...@@ -147,6 +148,7 @@ extern int32_t jniDebugFlag; ...@@ -147,6 +148,7 @@ extern int32_t jniDebugFlag;
extern int32_t tmrDebugFlag; extern int32_t tmrDebugFlag;
extern int32_t sdbDebugFlag; extern int32_t sdbDebugFlag;
extern int32_t httpDebugFlag; extern int32_t httpDebugFlag;
extern int32_t mqttDebugFlag;
extern int32_t monitorDebugFlag; extern int32_t monitorDebugFlag;
extern int32_t uDebugFlag; extern int32_t uDebugFlag;
extern int32_t rpcDebugFlag; extern int32_t rpcDebugFlag;
......
...@@ -120,6 +120,7 @@ int32_t tsOfflineThreshold = 86400*100; // seconds 10days ...@@ -120,6 +120,7 @@ int32_t tsOfflineThreshold = 86400*100; // seconds 10days
int32_t tsMgmtEqualVnodeNum = 4; int32_t tsMgmtEqualVnodeNum = 4;
int32_t tsEnableHttpModule = 1; int32_t tsEnableHttpModule = 1;
int32_t tsEnableMqttModule = 1;
int32_t tsEnableMonitorModule = 0; int32_t tsEnableMonitorModule = 0;
int32_t tsRestRowLimit = 10240; int32_t tsRestRowLimit = 10240;
...@@ -134,6 +135,7 @@ int32_t cDebugFlag = 135; ...@@ -134,6 +135,7 @@ int32_t cDebugFlag = 135;
int32_t jniDebugFlag = 131; int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131; int32_t odbcDebugFlag = 131;
int32_t httpDebugFlag = 131; int32_t httpDebugFlag = 131;
int32_t mqttDebugFlag = 131;
int32_t monitorDebugFlag = 131; int32_t monitorDebugFlag = 131;
int32_t qDebugFlag = 131; int32_t qDebugFlag = 131;
int32_t rpcDebugFlag = 135; int32_t rpcDebugFlag = 135;
...@@ -212,6 +214,7 @@ void taosSetAllDebugFlag() { ...@@ -212,6 +214,7 @@ void taosSetAllDebugFlag() {
jniDebugFlag = debugFlag; jniDebugFlag = debugFlag;
odbcDebugFlag = debugFlag; odbcDebugFlag = debugFlag;
httpDebugFlag = debugFlag; httpDebugFlag = debugFlag;
mqttDebugFlag = debugFlag;
monitorDebugFlag = debugFlag; monitorDebugFlag = debugFlag;
rpcDebugFlag = debugFlag; rpcDebugFlag = debugFlag;
uDebugFlag = debugFlag; uDebugFlag = debugFlag;
...@@ -890,6 +893,17 @@ static void doInitGlobalConfig() { ...@@ -890,6 +893,17 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqtt";
cfg.ptr = &tsEnableMqttModule;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "monitor"; cfg.option = "monitor";
cfg.ptr = &tsEnableMonitorModule; cfg.ptr = &tsEnableMonitorModule;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
...@@ -1112,6 +1126,17 @@ static void doInitGlobalConfig() { ...@@ -1112,6 +1126,17 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttDebugFlag";
cfg.ptr = &mqttDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "monitorDebugFlag"; cfg.option = "monitorDebugFlag";
cfg.ptr = &monitorDebugFlag; cfg.ptr = &monitorDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode cJson lz4) TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4)
IF (TD_ACCOUNT) IF (TD_ACCOUNT)
TARGET_LINK_LIBRARIES(taosd account) TARGET_LINK_LIBRARIES(taosd account)
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "trpc.h" #include "trpc.h"
#include "mnode.h" #include "mnode.h"
#include "http.h" #include "http.h"
#include "mqtt.h"
#include "monitor.h" #include "monitor.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeModule.h" #include "dnodeModule.h"
...@@ -62,6 +63,16 @@ static void dnodeAllocModules() { ...@@ -62,6 +63,16 @@ static void dnodeAllocModules() {
dnodeSetModuleStatus(TSDB_MOD_HTTP); dnodeSetModuleStatus(TSDB_MOD_HTTP);
} }
tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1);
tsModule[TSDB_MOD_MQTT].name = "mqtt";
tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem;
tsModule[TSDB_MOD_MQTT].cleanUpFp = mqttCleanUpSystem;
tsModule[TSDB_MOD_MQTT].startFp = mqttStartSystem;
tsModule[TSDB_MOD_MQTT].stopFp = mqttStopSystem;
if (tsEnableMqttModule) {
dnodeSetModuleStatus(TSDB_MOD_MQTT);
}
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1); tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
tsModule[TSDB_MOD_MONITOR].name = "monitor"; tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_H
#define TDENGINE_MQTT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttGetReqCount();
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
...@@ -376,6 +376,7 @@ typedef enum { ...@@ -376,6 +376,7 @@ typedef enum {
TSDB_MOD_MGMT, TSDB_MOD_MGMT,
TSDB_MOD_HTTP, TSDB_MOD_HTTP,
TSDB_MOD_MONITOR, TSDB_MOD_MONITOR,
TSDB_MOD_MQTT,
TSDB_MOD_MAX TSDB_MOD_MAX
} EModuleType; } EModuleType;
......
...@@ -3,3 +3,4 @@ PROJECT(TDengine) ...@@ -3,3 +3,4 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(monitor) ADD_SUBDIRECTORY(monitor)
ADD_SUBDIRECTORY(http) ADD_SUBDIRECTORY(http)
ADD_SUBDIRECTORY(mqtt)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mqtt ${SRC})
TARGET_LINK_LIBRARIES(mqtt taos_static z)
IF (TD_ADMIN)
TARGET_LINK_LIBRARIES(mqtt admin)
ENDIF ()
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_LOG_H
#define TDENGINE_MQTT_LOG_H
#include "tlog.h"
extern int32_t mqttDebugFlag;
#define mqttError(...) \
if (mqttDebugFlag & DEBUG_ERROR) { \
taosPrintLog("ERROR MQT ", 255, __VA_ARGS__); \
}
#define mqttWarn(...) \
if ( mqttDebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN MQT ", mqttDebugFlag, __VA_ARGS__); \
}
#define mqttTrace(...) \
if ( mqttDebugFlag & DEBUG_TRACE) { \
taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); \
}
#define mqttDump(...) \
if ( mqttDebugFlag & DEBUG_TRACE) { \
taosPrintLongString("MQT ", mqttDebugFlag, __VA_ARGS__); \
}
#define mqttPrint(...) \
{ taosPrintLog("MQT ", 255, __VA_ARGS__); }
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_SYSTEM_H
#define TDENGINE_MQTT_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttGetReqCount();
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mqttSystem.h"
#include "mqtt.h"
#include "mqttLog.h"
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
int32_t mqttGetReqCount() { return 0; }
int mqttInitSystem() {
mqttPrint("mqttInitSystem");
return 0;
}
int mqttStartSystem() {
mqttPrint("mqttStartSystem");
return 0;
}
void mqttStopSystem() {
mqttPrint("mqttStopSystem");
}
void mqttCleanUpSystem() {
mqttPrint("mqttCleanUpSystem");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册