From 957ed637b042f19c407c661d375193f933092bae Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 15 Feb 2023 16:49:21 +0800 Subject: [PATCH] fix: rsp type mismatch after link broken is processed --- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schRemote.c | 8 ++++---- source/libs/scheduler/src/schTask.c | 5 ++++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index e8216fcd7c..14eb21565b 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -230,6 +230,7 @@ typedef struct SSchTask { SSchRedirectCtx redirectCtx; // task redirect context bool waitRetry; // wait for retry int32_t execId; // task current execute index + int32_t failedExecId; // last failed task execute index SSchLevel *level; // level SRWLatch planLock; // task update plan lock SSubplan *plan; // subplan diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b6de9383d7..9c4ed65dd2 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -34,12 +34,12 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { if (lastMsgType != reqMsgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } return TSDB_CODE_SUCCESS; @@ -60,13 +60,13 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { if (lastMsgType != reqMsgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } if (taskStatus != JOB_TASK_STATUS_EXEC) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 8e60222ca6..bdab739327 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -64,6 +64,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->plan = pPlan; pTask->level = pLevel; pTask->execId = -1; + pTask->failedExecId = -2; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); @@ -166,7 +167,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v schUpdateTaskExecNode(pJob, pTask, handle, execId); - if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it + if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); @@ -182,6 +183,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) return TSDB_CODE_SCH_IGNORE_ERROR; } + pTask->failedExecId = pTask->execId; + int8_t jobStatus = 0; if (schJobNeedToStop(pJob, &jobStatus)) { SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus)); -- GitLab