提交 0c26308a 编写于 作者: D dapan1121

fix: add redirect error code and fix no error msg issue

上级 17c6d303
...@@ -262,24 +262,23 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -262,24 +262,23 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED) #define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NEED_REDIRECT_ERROR(_code) \ #define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB) (_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define REQUEST_TOTAL_EXEC_TIMES 2 #define REQUEST_TOTAL_EXEC_TIMES 2
......
...@@ -293,6 +293,7 @@ typedef struct SSchJob { ...@@ -293,6 +293,7 @@ typedef struct SSchJob {
void *chkKillParam; void *chkKillParam;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
int32_t redirectCode;
SRWLatch resLock; SRWLatch resLock;
SExecResult execRes; SExecResult execRes;
void *fetchRes; // TODO free it or not void *fetchRes; // TODO free it or not
...@@ -331,6 +332,9 @@ extern SSchedulerMgmt schMgmt; ...@@ -331,6 +332,9 @@ extern SSchedulerMgmt schMgmt;
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \ ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
(!SCH_IS_DATA_BIND_QRY_TASK(_task))) (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
......
...@@ -481,6 +481,10 @@ _return: ...@@ -481,6 +481,10 @@ _return:
} }
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
schUpdateJobErrCode(pJob, errCode); schUpdateJobErrCode(pJob, errCode);
int32_t code = atomic_load_32(&pJob->errCode); int32_t code = atomic_load_32(&pJob->errCode);
......
...@@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
SSchRedirectCtx *pCtx = &pTask->redirectCtx; SSchRedirectCtx *pCtx = &pTask->redirectCtx;
if (!pCtx->inRedirect) { if (!pCtx->inRedirect) {
pCtx->inRedirect = true; pCtx->inRedirect = true;
...@@ -380,7 +380,7 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) ...@@ -380,7 +380,7 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
if (lastTime > tsMaxRetryWaitTime) { if (lastTime > tsMaxRetryWaitTime) {
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
} }
pCtx->periodMs *= tsRedirectFactor; pCtx->periodMs *= tsRedirectFactor;
...@@ -415,7 +415,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -415,7 +415,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
pTask->retryTimes = 0; pTask->retryTimes = 0;
} }
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL)); if (!NO_RET_REDIRECT_ERROR(rspCode)) {
SCH_UPDATE_REDICT_CODE(pJob, rspCode);
}
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
pTask->waitRetry = true; pTask->waitRetry = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册