未验证 提交 70e806b0 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15705 from taosdata/feature/stream

enh(stream): keep thread from blocking
......@@ -18,8 +18,8 @@
#include "tcompare.h"
#include "tconfig.h"
#include "tdatablock.h"
#include "tlog.h"
#include "tgrant.h"
#include "tlog.h"
GRANT_CFG_DECLARE;
......@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = tsNumOfCores;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
......@@ -598,7 +598,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0;
}
void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
......@@ -612,7 +612,6 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
*forbidden = false;
}
int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
......@@ -1114,12 +1113,12 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag",
};
int32_t *optionVars[] = {
&dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag,
&tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag,
};
int32_t optionSize = tListLen(options);
......
......@@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1;
}
......
......@@ -330,6 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
......
......@@ -167,12 +167,13 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--;
if (*ref == 0) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data);
taosMemoryFree(ref);
taosMemoryFree(pRef);
}
}
taosArrayDestroy(pMerge->reqs);
......
......@@ -72,7 +72,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue;
}
qDebug("task %d(child %d) executed and get block");
qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
......@@ -241,6 +241,8 @@ int32_t streamExec(SStreamTask* pTask) {
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
......@@ -248,6 +250,7 @@ int32_t streamExec(SStreamTask* pTask) {
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册