diff --git a/documentation20/webdocs/markdowndocs/administrator-ch.md b/documentation20/webdocs/markdowndocs/administrator-ch.md index 36466d2b7ea29307b5c17ad21b0db47a98598fa4..f54c6b91a1f6b171676176615f5377dfdbe8ff95 100644 --- a/documentation20/webdocs/markdowndocs/administrator-ch.md +++ b/documentation20/webdocs/markdowndocs/administrator-ch.md @@ -95,6 +95,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修 - logKeepDays:日志文件的最长保存时间。大于0时,日志文件会被重命名为taosdlog.xxx,其中xxx为日志文件最后修改的时间戳,单位为秒。默认值:0天。 - maxSQLLength:单条SQL语句允许最长限制。默认值:65380字节。 - telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息,0表示不允许,1表示允许。 默认值:1。 +- stream: 是否启用连续查询(流计算功能),0表示不允许,1表示允许。 默认值:1。 **注意:**对于端口,TDengine会使用从serverPort起13个连续的TCP和UDP端口号,请务必在防火墙打开。因此如果是缺省配置,需要打开从6030都6042共13个端口,而且必须TCP和UDP都打开。 diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index ca88bca3c863dcfd4a95497fc572499dc93f31f9..7662d492805ca3a948f01b4a87e44bcaf01b79e5 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -260,4 +260,7 @@ # maxBinaryDisplayWidth 30 # enable/disable telemetry reporting -# telemetryReporting 1 \ No newline at end of file +# telemetryReporting 1 + +# enable/disable stream (continuous query) +# stream 1 diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index efe3d7678a39f140b413192adfe9654b3661251f..6e4274b3587869a7f83032f7600b1ea1c3d6cb22 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -125,6 +125,9 @@ extern char tsMonitorDbName[]; extern char tsInternalPass[]; extern int32_t tsMonitorInterval; +// stream +extern int32_t tsEnableStream; + // internal extern int32_t tsPrintAuth; extern int32_t tscEmbedded; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index d26746a5d10970eae333dc90da9c94c80948ce2a..18aa0ae6e3c570a2414d21dc00924d4fb954c95a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -161,6 +161,9 @@ char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; char tsInternalPass[] = "secretkey"; int32_t tsMonitorInterval = 30; // seconds +// stream +int32_t tsEnableStream = 1; + // internal int32_t tsPrintAuth = 0; int32_t tscEmbedded = 0; @@ -1015,6 +1018,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "stream"; + cfg.ptr = &tsEnableStream; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "httpEnableRecordSql"; cfg.ptr = &tsHttpEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index b1977fd5d95905d2a2d94b53e7555402ca5a01ab..efb8795962e769dd0b11b8d313177855c994f52d 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -69,6 +69,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { + if (tsEnableStream == 0) { + return NULL; + } SCqContext *pContext = calloc(sizeof(SCqContext), 1); if (pContext == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -99,6 +102,9 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { } void cqClose(void *handle) { + if (tsEnableStream == 0) { + return; + } SCqContext *pContext = handle; if (handle == NULL) return; @@ -129,6 +135,9 @@ void cqClose(void *handle) { } void cqStart(void *handle) { + if (tsEnableStream == 0) { + return; + } SCqContext *pContext = handle; if (pContext->dbConn || pContext->master) return; @@ -147,6 +156,9 @@ void cqStart(void *handle) { } void cqStop(void *handle) { + if (tsEnableStream == 0) { + return; + } SCqContext *pContext = handle; cInfo("vgId:%d, stop all CQs", pContext->vgId); if (pContext->dbConn == NULL || pContext->master == 0) return; @@ -174,6 +186,9 @@ void cqStop(void *handle) { } void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { + if (tsEnableStream == 0) { + return NULL; + } SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); @@ -203,6 +218,9 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema * } void cqDrop(void *handle) { + if (tsEnableStream == 0) { + return; + } SCqObj *pObj = handle; SCqContext *pContext = pObj->pContext; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 204dd2f3dd5e460b4110fcf3279a68ae13c85096..b516c9d90e126d68ca502af576d6f400dedd175e 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -267,16 +267,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { return terrno; } - SCqCfg cqCfg = {0}; - sprintf(cqCfg.user, "_root"); - strcpy(cqCfg.pass, tsInternalPass); - strcpy(cqCfg.db, pVnode->db); - cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToCache; - pVnode->cq = cqOpen(pVnode, &cqCfg); - if (pVnode->cq == NULL) { - vnodeCleanUp(pVnode); - return terrno; + if (tsEnableStream) { + SCqCfg cqCfg = {0}; + sprintf(cqCfg.user, "_root"); + strcpy(cqCfg.pass, tsInternalPass); + strcpy(cqCfg.db, pVnode->db); + cqCfg.vgId = vnode; + cqCfg.cqWrite = vnodeWriteToCache; + pVnode->cq = cqOpen(pVnode, &cqCfg); + if (pVnode->cq == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } } STsdbAppH appH = {0};