diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9cab39c30122072207daa9e9639ab92645fc1633..c5f161b66a8312a8c5919efca8fe8b1b2d61308c 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -52,7 +52,7 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { int32_t nodeNum = taosArrayGetSize(pJob->nodeList); pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM); } - + pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); } @@ -139,13 +139,15 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 } if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it - SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, + pTask->execId, pTask->waitRetry); return TSDB_CODE_SUCCESS; } SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); if (NULL == nodeInfo) { // ignore it - SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, + pTask->execId, pTask->waitRetry); return TSDB_CODE_SUCCESS; } @@ -314,7 +316,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { if (!schMgmt.cfg.enableReSchedule) { return TSDB_CODE_SUCCESS; } - + if (SCH_IS_DATA_BIND_TASK(pTask)) { return TSDB_CODE_SUCCESS; } @@ -341,7 +343,8 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 } if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { - SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId); + SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, + pTask->maxExecTimes, pTask->execId); schHandleJobFailure(pJob, rspCode); return TSDB_CODE_SUCCESS; } @@ -548,7 +551,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes); + SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, + pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } @@ -564,25 +568,25 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } -/* - if (SCH_IS_DATA_BIND_TASK(pTask)) { - if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { - *needRetry = false; - SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, - SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); - return TSDB_CODE_SUCCESS; - } - } else { - int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + /* + if (SCH_IS_DATA_BIND_TASK(pTask)) { + if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, + SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); + return TSDB_CODE_SUCCESS; + } + } else { + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); - if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { - *needRetry = false; - SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", - pTask->candidateIdx, candidateNum); - return TSDB_CODE_SUCCESS; + if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", + pTask->candidateIdx, candidateNum); + return TSDB_CODE_SUCCESS; + } } - } -*/ + */ *needRetry = true; SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode)); @@ -630,8 +634,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps, - SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, + naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn, + SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -711,10 +716,10 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { if (candidateNum <= 1) { goto _return; } - + switch (schMgmt.cfg.schPolicy) { case SCH_LOAD_SEQ: - case SCH_ALL: + case SCH_ALL: default: if (++pTask->candidateIdx >= candidateNum) { pTask->candidateIdx = 0; @@ -732,7 +737,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { _return: SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum); - + return TSDB_CODE_SUCCESS; } @@ -759,7 +764,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { return; } - int32_t i = 0; + int32_t i = 0; SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); while (nodeInfo) { if (nodeInfo->handle) { @@ -821,16 +826,16 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; - SSchJob *pJob = schAcquireJob(pCtx->jobRid); + SSchJob *pJob = schAcquireJob(pCtx->jobRid); if (NULL == pJob) { - taosMemoryFree(param); qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); + taosMemoryFree(param); SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING); } - + SSchTask *pTask = pCtx->pTask; - int8_t status = 0; - int32_t code = 0; + int8_t status = 0; + int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; @@ -891,13 +896,12 @@ _return: SCH_RET(code); } -int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { - +int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); if (NULL == param) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + param->jobRid = pJob->refId; param->pTask = pTask; @@ -906,7 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { } else { SCH_ERR_RET(schLaunchTaskImpl(param)); } - + return TSDB_CODE_SUCCESS; }