From 835ddbacf414e20faaba36a55a4b9e1add89c8b4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 11 Jul 2022 21:14:37 +0800 Subject: [PATCH] fix: fix retry issue --- source/libs/scheduler/inc/schInt.h | 2 ++ source/libs/scheduler/src/schJob.c | 38 ++++++++++++++++++++++++--- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/scheduler/src/schTask.c | 3 ++- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index e5c7e37479..4cae547077 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -223,6 +223,7 @@ typedef struct SSchJobAttr { typedef struct { int32_t op; + SRWLatch lock; bool syncReq; } SSchOpStatus; @@ -473,6 +474,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); +bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index e482814ee7..19bb93249f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -443,25 +443,37 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) { } void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { + SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); + if (SCH_OP_NULL == pJob->opStatus.op) { SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status)); - return; + goto _return; } if (op && pJob->opStatus.op != op) { SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op)); - return; + goto _return; } if (SCH_JOB_IN_SYNC_OP(pJob)) { + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); tsem_post(&pJob->rspSem); } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); schNotifyUserExecRes(pJob); } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) { + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); schNotifyUserFetchRes(pJob); } else { + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status)); } + + return; + +_return: + + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { @@ -658,13 +670,13 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { SCH_ERR_RET(schLaunchFetchTask(pJob)); - if (pJob->opStatus.syncReq) { + if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } } else { - if (pJob->opStatus.syncReq) { + if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } else { schPostJobRes(pJob, SCH_OP_FETCH); @@ -775,25 +787,37 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) { } } +bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) { + SCH_LOCK(SCH_READ, &pJob->opStatus.lock); + bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync); + SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock); + + return r; +} + void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { int32_t op = 0; switch (type) { case SCH_OP_EXEC: if (pReq && pReq->syncReq) { + SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); if (SCH_OP_NULL == op || op != type) { SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); } + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); schDumpJobExecRes(pJob, pReq->pExecRes); } break; case SCH_OP_FETCH: if (pReq && pReq->syncReq) { + SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); if (SCH_OP_NULL == op || op != type) { SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); } + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); } break; case SCH_OP_GET_STATUS: @@ -816,8 +840,10 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq switch (type) { case SCH_OP_EXEC: + SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); } @@ -825,10 +851,13 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); pJob->opStatus.syncReq = pReq->syncReq; + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); break; case SCH_OP_FETCH: + SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); } @@ -840,6 +869,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq pJob->userRes.cbParam = pReq->cbParam; pJob->opStatus.syncReq = pReq->syncReq; + SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 2257ba8328..3db1ba7be8 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -940,7 +940,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; - SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); + SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse, addr->epSet.numOfEps); } switch (msgType) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a6621d279d..d77fbc33fd 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -545,7 +545,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); if (SCH_IS_DATA_BIND_TASK(pTask)) { - SCH_SWITCH_EPSET(&pTask->plan->execNode); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SCH_SWITCH_EPSET(addr); } else { SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); } -- GitLab