未验证 提交 8fc2d7eb 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #16843 from taosdata/fix/TD-19057

fix: fix task retry and qnode fetch thread number issue
...@@ -63,7 +63,7 @@ int32_t tsNumOfVnodeWriteThreads = 2; ...@@ -63,7 +63,7 @@ int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2;
int32_t tsNumOfSnodeUniqueThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2;
...@@ -385,9 +385,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -385,9 +385,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
...@@ -527,6 +527,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -527,6 +527,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2; tsNumOfQnodeFetchThreads = numOfCores / 2;
...@@ -534,6 +535,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -534,6 +535,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->i32 = tsNumOfQnodeFetchThreads; pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
...@@ -691,7 +693,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -691,7 +693,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -939,8 +941,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -939,8 +941,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
......
...@@ -285,6 +285,7 @@ typedef struct SSchJob { ...@@ -285,6 +285,7 @@ typedef struct SSchJob {
typedef struct SSchTaskCtx { typedef struct SSchTaskCtx {
int64_t jobRid; int64_t jobRid;
SSchTask *pTask; SSchTask *pTask;
bool asyncLaunch;
} SSchTaskCtx; } SSchTaskCtx;
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
......
...@@ -138,12 +138,6 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 ...@@ -138,12 +138,6 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
...@@ -163,10 +157,15 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v ...@@ -163,10 +157,15 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId)); SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
} }
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pJob, pTask, handle, execId); schUpdateTaskExecNode(pJob, pTask, handle, execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
SCH_SET_TASK_HANDLE(pTask, handle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -352,7 +351,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -352,7 +351,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
pTask->waitRetry = true; pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); schRemoveTaskFromExecList(pJob, pTask);
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
taosMemoryFreeClear(pTask->msg); taosMemoryFreeClear(pTask->msg);
...@@ -599,7 +598,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -599,7 +598,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); schRemoveTaskFromExecList(pJob, pTask);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
...@@ -746,8 +745,7 @@ _return: ...@@ -746,8 +745,7 @@ _return:
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) { if (code) {
SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code); SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -836,6 +834,11 @@ int32_t schLaunchTaskImpl(void *param) { ...@@ -836,6 +834,11 @@ int32_t schLaunchTaskImpl(void *param) {
} }
SSchTask *pTask = pCtx->pTask; SSchTask *pTask = pCtx->pTask;
if (pCtx->asyncLaunch) {
SCH_LOCK_TASK(pTask);
}
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
...@@ -882,8 +885,6 @@ int32_t schLaunchTaskImpl(void *param) { ...@@ -882,8 +885,6 @@ int32_t schLaunchTaskImpl(void *param) {
_return: _return:
taosMemoryFree(param);
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
if (code) { if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code); code = schProcessOnTaskFailure(pJob, pTask, code);
...@@ -893,8 +894,14 @@ _return: ...@@ -893,8 +894,14 @@ _return:
} }
} }
if (pCtx->asyncLaunch) {
SCH_UNLOCK_TASK(pTask);
}
schReleaseJob(pJob->refId); schReleaseJob(pJob->refId);
taosMemoryFree(param);
SCH_RET(code); SCH_RET(code);
} }
...@@ -908,6 +915,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { ...@@ -908,6 +915,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
param->pTask = pTask; param->pTask = pTask;
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
param->asyncLaunch = true;
taosAsyncExec(schLaunchTaskImpl, param, NULL); taosAsyncExec(schLaunchTaskImpl, param, NULL);
} else { } else {
SCH_ERR_RET(schLaunchTaskImpl(param)); SCH_ERR_RET(schLaunchTaskImpl(param));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册