diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 41e3917937895ebcb867d96385f32636b86f76a7..efadcad350f0c869870bb0429b0e531a2d13f0da 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -18,8 +18,8 @@ #include "tcompare.h" #include "tconfig.h" #include "tdatablock.h" -#include "tlog.h" #include "tgrant.h" +#include "tlog.h" GRANT_CFG_DECLARE; @@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeStreamThreads = tsNumOfCores / 4; + tsNumOfVnodeStreamThreads = tsNumOfCores; tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; @@ -598,7 +598,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { return 0; } -void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { +void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); @@ -612,7 +612,6 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { *forbidden = false; } - int32_t taosSetCfg(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; @@ -1114,12 +1113,12 @@ void taosCfgDynamicOptions(const char *option, const char *value) { const char *options[] = { "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", - "tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag", + "tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag", }; int32_t *optionVars[] = { &dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag, &tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag, - &tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag, + &tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag, }; int32_t optionSize = tListLen(options); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 617695dbaae3d6d45efb07dc9c6bb395b3c95e4a..e4d6de849cd2e64b24645b00ee9a8d030421864a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, - terrstr(), TMSG_INFO(pMsg->msgType), qtype); + dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(), + TMSG_INFO(pMsg->msgType), qtype); return terrno != 0 ? terrno : -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ddd28053a8cea4d23dbf301b69aea3dcaf150a68..0738dfa05709f6f9349f079a72ab86f2b1d8f1ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -330,6 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: + // return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); case TDMT_STREAM_TASK_RECOVER: return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index eb14990c0e47748dd6d00ab1d6c2ab40d00b9b88..0bf6d4c921e5ed988b3bd3f090c9bca58e61d37e 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -167,12 +167,13 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; int32_t sz = taosArrayGetSize(pMerge->reqs); for (int32_t i = 0; i < sz; i++) { - int32_t* ref = taosArrayGetP(pMerge->dataRefs, i); - (*ref)--; - if (*ref == 0) { + int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); + int32_t ref = atomic_sub_fetch_32(pRef, 1); + ASSERT(ref >= 0); + if (ref == 0) { void* data = taosArrayGetP(pMerge->reqs, i); taosMemoryFree(data); - taosMemoryFree(ref); + taosMemoryFree(pRef); } } taosArrayDestroy(pMerge->reqs); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c5b8b20a8c87ad13147b4cf6492618f4fd3966c7..3c008f79344117210a653454496da9be187c7b94 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -72,7 +72,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) continue; } - qDebug("task %d(child %d) executed and get block"); + qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId); SSDataBlock block = {0}; assignOneDataBlock(&block, output); @@ -241,6 +241,8 @@ int32_t streamExec(SStreamTask* pTask) { pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; +// temporarily disable status closing, since it runs out of threads +#if 0 // set status closing atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING); @@ -248,6 +250,7 @@ int32_t streamExec(SStreamTask* pTask) { qDebug("stream exec, enter closing status"); pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; +#endif taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);