未验证 提交 13331134 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20725 from taosdata/fix/TS-2876

fix: task context not exists issue
......@@ -33,6 +33,7 @@ typedef enum {
JOB_TASK_STATUS_INIT,
JOB_TASK_STATUS_EXEC,
JOB_TASK_STATUS_PART_SUCC,
JOB_TASK_STATUS_FETCH,
JOB_TASK_STATUS_SUCC,
JOB_TASK_STATUS_FAIL,
JOB_TASK_STATUS_DROP,
......
......@@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) {
return "EXECUTING";
case JOB_TASK_STATUS_PART_SUCC:
return "PARTIAL_SUCCEED";
case JOB_TASK_STATUS_FETCH:
return "FETCHING";
case JOB_TASK_STATUS_SUCC:
return "SUCCEED";
case JOB_TASK_STATUS_FAIL:
......
......@@ -259,15 +259,26 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
static int32_t ignoreTime = 0;
if (++ignoreTime > 10 && 0 == taosRand() % 9) {
if (ctx->msgType == TDMT_SCH_FETCH) {
qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK);
qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
*rsped = true;
taosSsleep(3);
return;
}
#if 0
SRpcHandleInfo *pConn =
((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
: &ctx->ctrlConnInfo);
: &ctx->ctrlConnInfo);
qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
*rsped = true;
return;
#endif
}
}
......
......@@ -193,7 +193,7 @@ typedef struct SSchLevel {
int32_t taskSucceed;
int32_t taskNum;
int32_t taskLaunchedNum;
int32_t taskDoneNum;
int32_t taskExecDoneNum;
SArray *subTasks; // Element is SSchTask
} SSchLevel;
......@@ -299,6 +299,7 @@ typedef struct SSchJob {
SExecResult execRes;
void *fetchRes; // TODO free it or not
bool fetched;
bool noMoreRetry;
int64_t resNumOfRows; // from int32_t to int64_t
SSchResInfo userRes;
char *sql;
......@@ -333,13 +334,16 @@ extern SSchedulerMgmt schMgmt;
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
......@@ -361,6 +365,7 @@ extern SSchedulerMgmt schMgmt;
(SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
#define SCH_SET_JOB_TYPE(_job, type) \
do { \
......@@ -377,16 +382,24 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
(SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
......@@ -510,6 +523,11 @@ extern SSchedulerMgmt schMgmt;
} \
} while (0)
#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \
(_job)->levelIdx = (_job)->levelNum - 1; \
SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \
} while (0)
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void *pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
......@@ -562,7 +580,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
......@@ -591,6 +609,10 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode);
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask);
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode);
extern SSchDebug gSCHDebug;
......
......@@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
}
int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);
;
SCH_ERR_RET(code);
return code; // to avoid compiler error
......
......@@ -83,6 +83,10 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
oriStatus = SCH_GET_JOB_STATUS(pJob);
if (oriStatus == newStatus) {
if (JOB_TASK_STATUS_FETCH == newStatus) {
return code;
}
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
......@@ -108,10 +112,19 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break;
case JOB_TASK_STATUS_PART_SUCC:
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
newStatus != JOB_TASK_STATUS_FETCH) {
SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
}
break;
case JOB_TASK_STATUS_FETCH:
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
newStatus != JOB_TASK_STATUS_FETCH) {
SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCC:
case JOB_TASK_STATUS_FAIL:
......@@ -288,7 +301,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
SCH_RESET_JOB_LEVEL_IDX(pJob);
SSchLevel level = {0};
SNodeListNode *plans = NULL;
......@@ -550,9 +563,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
}
SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1);
if (doneNum == pLevel->taskNum) {
pJob->levelIdx--;
atomic_sub_fetch_32(&pJob->levelIdx, 1);
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
......@@ -562,6 +575,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
continue;
}
if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
continue;
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
}
}
......@@ -811,6 +828,75 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
}
}
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
pJob->noMoreRetry = true;
SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode));
SCH_ERR_RET(rspCode);
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
}
return TSDB_CODE_SUCCESS;
}
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0;
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
SCH_LOCK_TASK(pTask);
SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode));
qClearSubplanExecutionNode(pTask->plan);
schResetTaskForRetry(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
}
}
SCH_RESET_JOB_LEVEL_IDX(pJob);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
SCH_UNLOCK_TASK(pTask);
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode));
SCH_ERR_JRET(schLaunchJob(pJob));
SCH_LOCK_TASK(pTask);
SCH_RET(code);
_return:
SCH_LOCK_TASK(pTask);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
bool r = false;
SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
......@@ -907,7 +993,7 @@ int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
}
if (status != JOB_TASK_STATUS_PART_SUCC) {
if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......
......@@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
if (taskStatus != JOB_TASK_STATUS_FETCH) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
......@@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
char *msg = pMsg->pData;
int32_t msgSize = pMsg->len;
int32_t msgType = pMsg->msgType;
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
}
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_TASK_NEED_REDIRECT(pTask, reqType, rspCode, pMsg->len)) {
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
pTask->redirectCtx.inRedirect = false;
switch (msgType) {
......@@ -423,6 +410,38 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
char *msg = pMsg->pData;
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
}
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
} else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
pTask->redirectCtx.inRedirect = false;
SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode));
_return:
taosMemoryFreeClear(msg);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......
......@@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case JOB_TASK_STATUS_PART_SUCC:
SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
break;
case JOB_TASK_STATUS_FETCH:
SCH_ERR_JRET(schJobFetchRows(pJob));
break;
case JOB_TASK_STATUS_SUCC:
break;
case JOB_TASK_STATUS_FAIL:
......
......@@ -378,7 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
if (lastTime > tsMaxRetryWaitTime) {
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
pJob->noMoreRetry = true;
SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
}
pCtx->periodMs *= tsRedirectFactor;
......@@ -404,32 +405,35 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
if (NULL == pData) {
pTask->retryTimes = 0;
}
if (!NO_RET_REDIRECT_ERROR(rspCode)) {
SCH_UPDATE_REDICT_CODE(pJob, rspCode);
}
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask);
if (pTask->delayTimer) {
taosTmrStopA(&pTask->delayTimer);
}
taosHashClear(pTask->execNodes);
schRemoveTaskFromExecList(pJob, pTask);
schDeregisterTaskHb(pJob, pTask);
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
taosMemoryFreeClear(pTask->msg);
pTask->msgLen = 0;
pTask->lastMsgType = 0;
pTask->childReady = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
if (!NO_RET_REDIRECT_ERROR(rspCode)) {
SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
}
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
schResetTaskForRetry(pJob, pTask);
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData && pData->pEpSet) {
......@@ -445,12 +449,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
......@@ -486,20 +484,10 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode);
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
}
SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
if (NULL == pData->pEpSet) {
......@@ -509,7 +497,19 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
}
}
SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0;
}
SCH_RESET_JOB_LEVEL_IDX(pJob);
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
taosMemoryFreeClear(pData->pData);
taosMemoryFreeClear(pData->pEpSet);
......@@ -621,6 +621,13 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*/
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
if (pJob->noMoreRetry) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
pTask->maxRetryTimes);
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
pTask->maxRetryTimes++;
......@@ -645,7 +652,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS;
}
if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
......@@ -1067,7 +1074,6 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
......@@ -1272,6 +1278,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
} else {
......
......@@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq));
SCH_ERR_JRET(schJobFetchRows(pJob));
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FETCH, pReq));
_return:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册