提交 e345023d 编写于 作者: L Liu Jicong

enh(stream): keep thread from blocking

上级 08a4a7ab
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
#include "tcompare.h" #include "tcompare.h"
#include "tconfig.h" #include "tconfig.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tlog.h"
#include "tgrant.h" #include "tgrant.h"
#include "tlog.h"
GRANT_CFG_DECLARE; GRANT_CFG_DECLARE;
...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4; tsNumOfVnodeStreamThreads = tsNumOfCores;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
...@@ -598,7 +598,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -598,7 +598,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0; return 0;
} }
void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
int32_t len = strlen(name); int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
...@@ -612,7 +612,6 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { ...@@ -612,7 +612,6 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
*forbidden = false; *forbidden = false;
} }
int32_t taosSetCfg(SConfig *pCfg, char *name) { int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name); int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
...@@ -1114,12 +1113,12 @@ void taosCfgDynamicOptions(const char *option, const char *value) { ...@@ -1114,12 +1113,12 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
const char *options[] = { const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag", "tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag",
}; };
int32_t *optionVars[] = { int32_t *optionVars[] = {
&dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag, &dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag,
&tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag, &tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag, &tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag,
}; };
int32_t optionSize = tListLen(options); int32_t optionSize = tListLen(options);
......
...@@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(),
terrstr(), TMSG_INFO(pMsg->msgType), qtype); TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1; return terrno != 0 ? terrno : -1;
} }
......
...@@ -330,6 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -330,6 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER: case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
......
...@@ -167,12 +167,13 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -167,12 +167,13 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs); int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGetP(pMerge->dataRefs, i); int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--; int32_t ref = atomic_sub_fetch_32(pRef, 1);
if (*ref == 0) { ASSERT(ref >= 0);
if (ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i); void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data); taosMemoryFree(data);
taosMemoryFree(ref); taosMemoryFree(pRef);
} }
} }
taosArrayDestroy(pMerge->reqs); taosArrayDestroy(pMerge->reqs);
......
...@@ -72,7 +72,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -72,7 +72,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue; continue;
} }
qDebug("task %d(child %d) executed and get block"); qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
...@@ -241,6 +241,8 @@ int32_t streamExec(SStreamTask* pTask) { ...@@ -241,6 +241,8 @@ int32_t streamExec(SStreamTask* pTask) {
pRes = streamExecForQall(pTask, pRes); pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL; if (pRes == NULL) goto FAIL;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing // set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
...@@ -248,6 +250,7 @@ int32_t streamExec(SStreamTask* pTask) { ...@@ -248,6 +250,7 @@ int32_t streamExec(SStreamTask* pTask) {
qDebug("stream exec, enter closing status"); qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes); pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL; if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册