diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2331f0b23c52d6b826796549a1964d4eb970a9fd..5a0c0e0777290856c101cd3b88c75792da606232 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -159,6 +159,8 @@ extern int32_t tsUptimeInterval; extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryInterval; +extern bool tsDisableStream; + // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d4849650e6c80fcab7b3831f7fa01a10d29ec50b..94f1a8f730350325234421a70c21f9d84e04c26f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -189,6 +189,7 @@ int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdLdLibPath[512] = ""; +bool tsDisableStream = false; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -469,6 +470,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; + if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; + GRANT_CFG_ADD; return 0; } @@ -770,6 +773,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } + + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; + GRANT_CFG_GET; return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f89bc2036252961ff10d81774a9c6f756fdebf43..b9df3e58266528810f894750fe12be43b9e44003 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -307,7 +307,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) taosWUnLockLatch(&pTq->pushLock); } - if (vnodeIsRoleLeader(pTq->pVnode)) { + if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (msgType == TDMT_VND_SUBMIT) { void* data = taosMemoryMalloc(msgLen);