From 80798dd934a3f8df7c5e62a965404a6f9e2e06c3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 28 Mar 2023 09:20:53 +0800 Subject: [PATCH] fix: job retry issue --- include/libs/qcom/query.h | 1 + source/common/src/tdatablock.c | 2 +- source/libs/qcom/src/queryUtil.c | 2 + source/libs/scheduler/inc/schInt.h | 30 +++++++++---- source/libs/scheduler/src/schFlowCtrl.c | 1 - source/libs/scheduler/src/schJob.c | 42 ++++++++++++++++-- source/libs/scheduler/src/schRemote.c | 42 +++++++++++------- source/libs/scheduler/src/schStatus.c | 3 ++ source/libs/scheduler/src/schTask.c | 59 ++++++++++--------------- source/libs/scheduler/src/scheduler.c | 2 +- 10 files changed, 119 insertions(+), 65 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cb547ee6b3..b6ada5a0c7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -33,6 +33,7 @@ typedef enum { JOB_TASK_STATUS_INIT, JOB_TASK_STATUS_EXEC, JOB_TASK_STATUS_PART_SUCC, + JOB_TASK_STATUS_FETCH, JOB_TASK_STATUS_SUCC, JOB_TASK_STATUS_FAIL, JOB_TASK_STATUS_DROP, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3c8d394b43..a75046d06d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { void blockDataEmpty(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; - if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) { + if (pInfo->capacity == 0) { return; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index c68a08682c..9d8c170003 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) { return "EXECUTING"; case JOB_TASK_STATUS_PART_SUCC: return "PARTIAL_SUCCEED"; + case JOB_TASK_STATUS_FETCH: + return "FETCHING"; case JOB_TASK_STATUS_SUCC: return "SUCCEED"; case JOB_TASK_STATUS_FAIL: diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 85b952937f..b7f9272bdc 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -193,7 +193,7 @@ typedef struct SSchLevel { int32_t taskSucceed; int32_t taskNum; int32_t taskLaunchedNum; - int32_t taskDoneNum; + int32_t taskExecDoneNum; SArray *subTasks; // Element is SSchTask } SSchLevel; @@ -340,6 +340,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) +#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC) +#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC) + #define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL) #define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle)) @@ -361,6 +364,7 @@ extern SSchedulerMgmt schMgmt; (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) +#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1)) #define SCH_SET_JOB_TYPE(_job, type) \ do { \ @@ -377,16 +381,24 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) -#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \ - (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect)) #define SCH_REDIRECT_MSGTYPE(_msgType) \ ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \ (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) -#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \ - (SCH_REDIRECT_MSGTYPE(_msgType) && \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)))) -#define SCH_NEED_RETRY(_msgType, _code) \ - ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) +#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \ + (SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx)) +#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \ + (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx)) +#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \ + (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect) + +#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code)) +#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && \ + (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) +#define SCH_TASK_NEED_RETRY(_msgType, _code) \ + ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -562,7 +574,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet); -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); +int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode); int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq); void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode); diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 5e4fe4b8a1..9cb95a6bbe 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { } int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl); - ; SCH_ERR_RET(code); return code; // to avoid compiler error diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 980a8ac6a1..b2c90dc67f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -108,10 +108,18 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_PART_SUCC: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && + newStatus != JOB_TASK_STATUS_FETCH) { SCH_ERR_JRET(TSDB_CODE_APP_ERROR); } + break; + case JOB_TASK_STATUS_FETCH: + if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { + SCH_ERR_JRET(TSDB_CODE_APP_ERROR); + } + break; case JOB_TASK_STATUS_SUCC: case JOB_TASK_STATUS_FAIL: @@ -550,9 +558,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { } SSchLevel *pLevel = pTask->level; - int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1); + int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1); if (doneNum == pLevel->taskNum) { - pJob->levelIdx--; + atomic_sub_fetch_32(&pJob->levelIdx, 1); pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); for (int32_t i = 0; i < pLevel->taskNum; ++i) { @@ -562,6 +570,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { continue; } + if (SCH_TASK_ALREADY_LAUNCHED(pTask)) { + continue; + } + SCH_ERR_RET(schLaunchTask(pJob, pTask)); } } @@ -811,6 +823,30 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) { } } +int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { + if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->fetched) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_JRET(rspCode); + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + + SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); + +} + bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { bool r = false; SCH_LOCK(SCH_READ, &pJob->opStatus.lock); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 6f4130fd9f..af206aa46e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } - if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { + if (taskStatus != JOB_TASK_STATUS_FETCH) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); @@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { return TSDB_CODE_SUCCESS; } -// Note: no more task error processing, handled in function internal -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { +int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; char *msg = pMsg->pData; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; - bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); - if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); - } - - SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); - - int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); - if (SCH_TASK_NEED_REDIRECT(pTask, reqType, rspCode, pMsg->len)) { - SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); - } - pTask->redirectCtx.inRedirect = false; switch (msgType) { @@ -423,6 +410,31 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } + + +// Note: no more task error processing, handled in function internal +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + int32_t msgType = pMsg->msgType; + + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); + } + + SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); + + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); + if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { + SCH_RET(schHandleJobRetry()); + } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) { + SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); + } + + pTask->redirectCtx.inRedirect = false; + + SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode)); +} int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index 4c16a81a05..9d0ad30e2a 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { case JOB_TASK_STATUS_PART_SUCC: SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob)); break; + case JOB_TASK_STATUS_FETCH: + SCH_ERR_JRET(schJobFetchRows(pJob)); + break; case JOB_TASK_STATUS_SUCC: break; case JOB_TASK_STATUS_FAIL: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index bdab739327..a3194c7ff7 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -404,21 +404,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { - int32_t code = 0; - - SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); - - if (NULL == pData) { - pTask->retryTimes = 0; - } - - if (!NO_RET_REDIRECT_ERROR(rspCode)) { - SCH_UPDATE_REDICT_CODE(pJob, rspCode); - } - - SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); - +void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); @@ -426,10 +412,27 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 schRemoveTaskFromExecList(pJob, pTask); schDeregisterTaskHb(pJob, pTask); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + if (SCH_TASK_EXEC_DONE(pTask)) { + atomic_sub_fetch_32(&pTask->level->taskExecDoneNum, 1); + } taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); +} + +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { + int32_t code = 0; + + SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); + + if (!NO_RET_REDIRECT_ERROR(rspCode)) { + SCH_UPDATE_REDICT_CODE(pJob, rspCode); + } + + SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); + + schResetTaskForRetry(pJob, pTask); if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pData && pData->pEpSet) { @@ -445,12 +448,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); } - if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { - if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) { - SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); - } - } - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask)); @@ -486,20 +483,10 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { +int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - if (JOB_TASK_STATUS_PART_SUCC == pJob->status) { - SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->fetched) { - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); - SCH_ERR_JRET(rspCode); - } - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - - schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); - } + SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) { if (NULL == pData->pEpSet) { @@ -510,6 +497,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 } code = schDoTaskRedirect(pJob, pTask, pData, rspCode); + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -645,7 +633,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) { + if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; @@ -1067,7 +1055,6 @@ int32_t schLaunchTaskImpl(void *param) { SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } - // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); @@ -1272,6 +1259,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } + SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) { SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask)); } else { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7cd5e957b6..2b46a4710e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) { SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq)); - SCH_ERR_JRET(schJobFetchRows(pJob)); + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FETCH, pReq)); _return: -- GitLab