diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ffb982af8e63e7c4ed8b5c4d5bbb917a82c19589..91ec5f52e5f948f45410977d5db3111d4b2183f9 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -262,24 +262,23 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED) +#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) -#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later +#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) + +#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define NEED_REDIRECT_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ - (_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ - SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \ - (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) + (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ + SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) + #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ (_type) == TDMT_MND_DROP_STB) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ - ((_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ - SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \ - (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) + (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) #define REQUEST_TOTAL_EXEC_TIMES 2 diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 77aafa9a2758c895357cb0cc67d794676b615261..39118910f1f975db2c43378e01e61be2c6e2e3aa 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -293,6 +293,7 @@ typedef struct SSchJob { void *chkKillParam; SSchTask *fetchTask; int32_t errCode; + int32_t redirectCode; SRWLatch resLock; SExecResult execRes; void *fetchRes; // TODO free it or not @@ -331,6 +332,9 @@ extern SSchedulerMgmt schMgmt; ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \ (!SCH_IS_DATA_BIND_QRY_TASK(_task))) +#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) +#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) + #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 2d41cc287c16b55885c45d0c692867413645e9a7..d422f0e88f104749dc92804b46d38aca6f89b40b 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -481,6 +481,10 @@ _return: } int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + return TSDB_CODE_SCH_IGNORE_ERROR; + } + schUpdateJobErrCode(pJob, errCode); int32_t code = atomic_load_32(&pJob->errCode); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 0f15002d9baa2bff1dda420e0fce39698ae53932..cd69834924d69b43e29ae71aa1ab0ec6f9245f77 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { +int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) { SSchRedirectCtx *pCtx = &pTask->redirectCtx; if (!pCtx->inRedirect) { pCtx->inRedirect = true; @@ -380,7 +380,7 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); + SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode)); } pCtx->periodMs *= tsRedirectFactor; @@ -415,7 +415,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 pTask->retryTimes = 0; } - SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL)); + if (!NO_RET_REDIRECT_ERROR(rspCode)) { + SCH_UPDATE_REDICT_CODE(pJob, rspCode); + } + + SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); pTask->waitRetry = true;