提交 c7efe171 编写于 作者: D dapan1121

fix: fix task retry and qnode fetch thread num issue

上级 43bdaa6c
...@@ -63,7 +63,7 @@ int32_t tsNumOfVnodeWriteThreads = 2; ...@@ -63,7 +63,7 @@ int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2;
int32_t tsNumOfSnodeUniqueThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2;
...@@ -385,9 +385,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -385,9 +385,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
...@@ -527,6 +527,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -527,6 +527,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2; tsNumOfQnodeFetchThreads = numOfCores / 2;
...@@ -534,6 +535,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -534,6 +535,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->i32 = tsNumOfQnodeFetchThreads; pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
...@@ -691,7 +693,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -691,7 +693,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -939,8 +941,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -939,8 +941,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
......
...@@ -283,8 +283,9 @@ typedef struct SSchJob { ...@@ -283,8 +283,9 @@ typedef struct SSchJob {
} SSchJob; } SSchJob;
typedef struct SSchTaskCtx { typedef struct SSchTaskCtx {
int64_t jobRid; int64_t jobRid;
SSchTask *pTask; SSchTask *pTask;
bool asyncLaunch;
} SSchTaskCtx; } SSchTaskCtx;
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
......
...@@ -396,7 +396,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -396,7 +396,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
tstrerror(rspCode)); tstrerror(rspCode));
SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode); code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode);
pMsg->pData = NULL; pMsg->pData = NULL;
......
...@@ -138,11 +138,6 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 ...@@ -138,11 +138,6 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
...@@ -160,11 +155,16 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v ...@@ -160,11 +155,16 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
if (dropExecNode) { if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId)); SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
} }
schUpdateTaskExecNode(pJob, pTask, handle, execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
SCH_SET_TASK_HANDLE(pTask, handle); SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pJob, pTask, handle, execId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -349,7 +349,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -349,7 +349,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
pTask->waitRetry = true; pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); schRemoveTaskFromExecList(pJob, pTask);
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
taosMemoryFreeClear(pTask->msg); taosMemoryFreeClear(pTask->msg);
...@@ -593,7 +593,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -593,7 +593,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); schRemoveTaskFromExecList(pJob, pTask);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
...@@ -739,8 +739,7 @@ _return: ...@@ -739,8 +739,7 @@ _return:
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) { if (code) {
SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code); SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -829,6 +828,11 @@ int32_t schLaunchTaskImpl(void *param) { ...@@ -829,6 +828,11 @@ int32_t schLaunchTaskImpl(void *param) {
} }
SSchTask *pTask = pCtx->pTask; SSchTask *pTask = pCtx->pTask;
if (pCtx->asyncLaunch) {
SCH_LOCK_TASK(pTask);
}
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
...@@ -875,8 +879,6 @@ int32_t schLaunchTaskImpl(void *param) { ...@@ -875,8 +879,6 @@ int32_t schLaunchTaskImpl(void *param) {
_return: _return:
taosMemoryFree(param);
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
if (code) { if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code); code = schProcessOnTaskFailure(pJob, pTask, code);
...@@ -886,8 +888,14 @@ _return: ...@@ -886,8 +888,14 @@ _return:
} }
} }
if (pCtx->asyncLaunch) {
SCH_UNLOCK_TASK(pTask);
}
schReleaseJob(pJob->refId); schReleaseJob(pJob->refId);
taosMemoryFree(param);
SCH_RET(code); SCH_RET(code);
} }
...@@ -902,6 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { ...@@ -902,6 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
param->pTask = pTask; param->pTask = pTask;
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
param->asyncLaunch = true;
taosAsyncExec(schLaunchTaskImpl, param, NULL); taosAsyncExec(schLaunchTaskImpl, param, NULL);
} else { } else {
SCH_ERR_RET(schLaunchTaskImpl(param)); SCH_ERR_RET(schLaunchTaskImpl(param));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册