From b5207239b5cb2b9ff8729c2363936613414ece2d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 5 Jul 2022 16:10:38 +0800 Subject: [PATCH] enh: refactor scheduler code --- include/libs/scheduler/scheduler.h | 5 +- include/util/taoserror.h | 2 +- source/client/src/clientImpl.c | 4 +- source/client/src/clientMain.c | 7 +- source/libs/scheduler/inc/schInt.h | 17 +- source/libs/scheduler/src/schJob.c | 263 ++++++++++-------- source/libs/scheduler/src/schRemote.c | 134 ++++----- source/libs/scheduler/src/schStatus.c | 78 +++--- source/libs/scheduler/src/schTask.c | 109 +++----- source/libs/scheduler/src/schUtil.c | 2 + source/libs/scheduler/src/scheduler.c | 124 ++------- source/libs/scheduler/test/schedulerTests.cpp | 10 +- source/util/src/terror.c | 1 + 13 files changed, 338 insertions(+), 418 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 5f9f65d76a..ae4cbb498c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -82,10 +82,11 @@ typedef struct SSchedulerReq { const char *sql; int64_t startTs; schedulerExecFp execFp; - void* execParam; + schedulerFetchFp fetchFp; + void* cbParam; schedulerChkKillFp chkKillFp; void* chkKillParam; - SQueryResult* pQueryRes; + SExecResult* pExecRes; char** pFetchRes; } SSchedulerReq; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b871452828..d93fb92ee5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -388,10 +388,10 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) #define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) #define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B) -//json #define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C) #define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D) #define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E) +#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x071F) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0e031bd24f..423e7982ab 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1368,9 +1368,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) SReqResultInfo* pResInfo = &pRequest->body.resInfo; SSchedulerReq req = { .syncReq = true, - . + .pFetchRes = &pResInfo->pData, }; - pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); + pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; return NULL; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f660c46d3c..4e24fb4f48 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -863,7 +863,12 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { } } - schedulerFetchRowsA(pRequest->body.queryJob, fetchCallback, pRequest); + SSchedulerReq req = { + .syncReq = false, + .fetchFp = fetchCallback, + .execParam = pRequest, + }; + schedulerFetchRows(pRequest->body.queryJob, &req); } void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index cceea452db..2ad2fc9029 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -52,15 +52,9 @@ typedef enum { SCH_OP_NULL = 0, SCH_OP_EXEC, SCH_OP_FETCH, + SCH_OP_GET_STATUS, } SCH_OP_TYPE; -typedef enum { - SCH_EVENT_BEGIN_OP = 1, - SCH_EVENT_END_OP, - SCH_EVENT_MSG, - SCH_EVENT_DROP, -} SCH_EVENT_TYPE; - typedef struct SSchTrans { void *pTrans; void *pHandle; @@ -108,7 +102,7 @@ typedef struct SSchResInfo { void** fetchRes; schedulerExecFp execFp; schedulerFetchFp fetchFp; - void* userParam; + void* cbParam; } SSchResInfo; typedef struct SSchOpEvent { @@ -358,9 +352,10 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_WLOG(param, ...) \ qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) -#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) -#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0) +#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0) +#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); goto _return; } } while (0) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 9f1679f5b2..d514ed2a9f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -51,7 +51,12 @@ _return: SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); } - +bool schJobDone(SSchJob *pJob) { + int8_t status = SCH_GET_JOB_STATUS(pJob); + + return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || + status == JOB_TASK_STATUS_SUCC); +} FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int8_t status = SCH_GET_JOB_STATUS(pJob); @@ -59,13 +64,14 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { *pStatus = status; } + if (schJobDone(pJob)) { + return true; + } + if ((*pJob->chkKillFp)(pJob->chkKillParam)) { schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); return true; - } - - return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || - status == JOB_TASK_STATUS_SUCC); + } } int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { @@ -77,10 +83,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { oriStatus = SCH_GET_JOB_STATUS(pJob); if (oriStatus == newStatus) { - if (newStatus == JOB_TASK_STATUS_DROP) { - SCH_ERR_JRET(TSDB_CODE_SCH_JOB_IS_DROPPING); - } - SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -140,7 +142,11 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: - SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); + if (TSDB_CODE_SCH_IGNORE_ERROR == code) { + SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); + } else { + SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); + } SCH_RET(code); } @@ -360,7 +366,7 @@ _return: } -int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) { +int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; pRes->res = pJob->execRes; @@ -372,7 +378,7 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) { int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { int32_t code = 0; if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCC)); + SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL)); } while (true) { @@ -451,9 +457,6 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { - // if already FAILED, no more processing - SCH_ERR_RET(schUpdateJobStatus(pJob, status)); - schUpdateJobErrCode(pJob, errCode); int32_t code = atomic_load_32(&pJob->errCode); @@ -463,13 +466,17 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod schPostJobRes(pJob, 0); - SCH_RET(code); + SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } // Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + return TSDB_CODE_SCH_IGNORE_ERROR; + } + schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SCH_IGNORE_ERROR; } // Note: no more error processing, handled in function internal @@ -477,19 +484,10 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROP, errCode)); } -// Note: no more task error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { - int32_t code = 0; - - SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC)); - schPostJobRes(pJob, SCH_OP_EXEC); return TSDB_CODE_SUCCESS; - -_return: - - SCH_RET(schProcessOnJobFailure(pJob, code)); } void schProcessOnDataFetched(SSchJob *pJob) { @@ -570,7 +568,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { int32_t schLaunchJob(SSchJob *pJob) { if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) { SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->resData)); - SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); + SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } else { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); @@ -586,12 +584,6 @@ void schDropJobAllTasks(SSchJob *pJob) { // schDropTaskInHashList(pJob, pJob->failTasks); } -int32_t schCancelJob(SSchJob *pJob) { - // TODO - return TSDB_CODE_SUCCESS; - // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST -} - void schFreeJobImpl(void *job) { if (NULL == job) { return; @@ -603,10 +595,6 @@ void schFreeJobImpl(void *job) { qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); - if (pJob->status == JOB_TASK_STATUS_EXEC) { - schCancelJob(pJob); - } - schDropJobAllTasks(pJob); int32_t numOfLevels = taosArrayGetSize(pJob->levels); @@ -655,34 +643,21 @@ int32_t schJobFetchRows(SSchJob *pJob) { int32_t code = 0; if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { - SCH_ERR_JRET(schLaunchFetchTask(pJob)); - tsem_wait(&pJob->rspSem); - } - - SCH_ERR_JRET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); - -_return: - - schEndOperation(pJob); - - SCH_RET(code); -} - -int32_t schJobFetchRowsA(SSchJob *pJob) { - int32_t code = 0; - - if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) { + SCH_ERR_RET(schLaunchFetchTask(pJob)); + + if (pJob->opStatus.syncReq) { + SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + tsem_wait(&pJob->rspSem); + schPostJobRes(pJob, SCH_OP_FETCH); + } + } else { schPostJobRes(pJob, SCH_OP_FETCH); - return TSDB_CODE_SUCCESS; } - - SCH_ERR_RET(schLaunchFetchTask(pJob)); - return TSDB_CODE_SUCCESS; + SCH_RET(code); } - -int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) { +int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -698,7 +673,7 @@ int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) { pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; - pJob->userRes.userParam = pReq->execParam; + pJob->userRes.cbParam = pReq->cbParam; pJob->opStatus.op = SCH_OP_EXEC; pJob->opStatus.syncReq = pReq->syncReq; @@ -730,35 +705,28 @@ int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) { tsem_init(&pJob->rspSem, 0, 0); - refId = taosAddRef(schMgmt.jobRef, pJob); - if (refId < 0) { + pJob->refId = taosAddRef(schMgmt.jobRef, pJob); + if (pJob->refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_ERR_JRET(terrno); } atomic_add_fetch_32(&schMgmt.jobNum, 1); - if (NULL == schAcquireJob(refId)) { - SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - pJob->refId = refId; + *pJobId = pJob->refId; SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId); - *pSchJob = pJob; - return TSDB_CODE_SUCCESS; _return: if (NULL == pJob) { qDestroyQueryPlan(pReq->pDag); - } else if (refId < 0) { + } else if (pJob->refId < 0) { schFreeJobImpl(pJob); } else { - taosRemoveRef(schMgmt.jobRef, refId); + taosRemoveRef(schMgmt.jobRef, pJob->refId); } SCH_RET(code); @@ -768,7 +736,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { int32_t code = 0; qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId); - SCH_ERR_JRET(schLaunchJob(pJob)); + SCH_ERR_RET(schLaunchJob(pJob)); if (pReq->syncReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); @@ -778,83 +746,148 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); return TSDB_CODE_SUCCESS; - -_return: - - SCH_RET(schProcessOnJobFailure(pJob, code)); } -void schProcessOnOpEnd(SSchJob *pJob) { - int32_t op = atomic_load_32(&pJob->opStatus.op); - if (SCH_OP_NULL == op) { - SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status)); - return; +void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { + switch (type) { + case SCH_OP_EXEC: + int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); + if (SCH_OP_NULL == op || op != type) { + SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); + } + + if (pReq) { + schDumpJobExecRes(pJob, pReq->pExecRes); + } + break; + case SCH_OP_FETCH: + int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); + if (SCH_OP_NULL == op || op != type) { + SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); + } + break; + case SCH_OP_GET_STATUS: + errCode = TSDB_CODE_SUCCESS; + break; + default: + break; } - atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL); - - SCH_JOB_DLOG("job end %s operation", schGetOpStr(op)); + if (errCode) { + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); + } + + SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); } -int32_t schProcessOnOpBegin(SSchJob* pJob, SSchEvent* pEvent) { +int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t code = 0; int8_t status = 0; - SSchOpEvent* pInfo = (SSchOpEvent*)pEvent->info; - SCH_OP_TYPE type, bool sync; if (schJobNeedToStop(pJob, &status)) { - SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type)); - SCH_ERR_JRET(pJob->errCode); + SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } - if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { - SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); - SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); - } - - SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); - - pJob->opStatus.syncReq = sync; - switch (type) { case SCH_OP_EXEC: - SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC)); + if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { + SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); + SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); + + pJob->opStatus.syncReq = pReq->syncReq; break; case SCH_OP_FETCH: + if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { + SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); + SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); + + pJob->opStatus.syncReq = pReq->syncReq; + if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } if (status != JOB_TASK_STATUS_PART_SUCC) { SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + pJob->userRes.fetchRes = pReq->pFetchRes; + pJob->userRes.fetchFp = pReq->fetchFp; + pJob->userRes.cbParam = pReq->cbParam; + + break; + case SCH_OP_GET_STATUS: + if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) { + qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: SCH_JOB_ELOG("unknown operation type %d", type); - SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); + SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); } return TSDB_CODE_SUCCESS; +} -_return: - - schEndOperation(pJob); +void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + if (pTask) { + SCH_UNLOCK_TASK(pTask); + } - SCH_RET(code); + if (errCode) { + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); + } + + if (pJob) { + schReleaseJob(pJob->refId); + } } -int32_t schHandleJobEvent(SSchJob* pJob, SSchEvent* pEvent) { - switch (pEvent->event) { - case SCH_EVENT_BEGIN_OP: - schProcessOnOpBegin(pJob, pEvent); - case SCH_EVENT_END_OP: - schProcessOnOpEnd(pJob); +int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId) { + int32_t code = 0; + int8_t status = 0; + + SSchTask *pTask = NULL; + SSchJob *pJob = schAcquireJob(rId); + if (NULL == pJob) { + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); + SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); } -} + + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); + } + + SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask)); + + SCH_LOCK_TASK(pTask); + + return TSDB_CODE_SUCCESS; +_return: + + if (pTask) { + SCH_UNLOCK_TASK(pTask); + } + if (pJob) { + schReleaseJob(rId); + } + + SCH_RET(code); +} diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 479d3665a4..64368162e3 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -88,9 +88,21 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy } // Note: no more task error processing, handled in function internal -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, - int32_t rspCode) { +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; + char *msg = pMsg->pData; + int32_t msgSize = pMsg->len; + int32_t msgType = pMsg->msgType; + + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); + + SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); + + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); + if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { + SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); + } switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { @@ -362,65 +374,24 @@ _return: int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - int32_t msgType = pMsg->msgType; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTask *pTask = NULL; + SSchJob *pJob = NULL; - SSchJob *pJob = schAcquireJob(pParam->refId); - if (NULL == pJob) { - qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:0x%" PRIx64, - pParam->queryId, pParam->taskId, pParam->refId); - SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); - } - - SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask)); - - SCH_LOCK_TASK(pTask); - - SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); - - if (pParam->execId != pTask->execId) { - SCH_TASK_DLOG("execId %d mis-match current execId %d", pParam->execId, pTask->execId); - goto _return; - } - - bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execId)); - - int8_t status = 0; - if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); - code = atomic_load_32(&pJob->errCode); - goto _return; - } - - SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); + SCH_TASK_DLOG("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); - int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); - if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { - code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); - goto _return; - } + SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); - schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode); + code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode); pMsg->pData = NULL; -_return: - - if (pTask) { - if (code) { - schProcessOnTaskFailure(pJob, pTask, code); - } - - SCH_UNLOCK_TASK(pTask); - } - - if (pJob) { - schReleaseJob(pParam->refId); - } + schProcessOnCbEnd(pJob, pTask, code); taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(param); + + SCH_TASK_DLOG("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + SCH_RET(code); } @@ -451,6 +422,37 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { } +int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { + SSchedulerHbRsp rsp = {0}; + SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; + + if (code) { + qError("hb rsp error:%s", tstrerror(code)); + SCH_ERR_JRET(code); + } + + if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) { + qError("invalid hb rsp msg, size:%d", pMsg->len); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSchTrans trans = {0}; + trans.pTrans = pParam->pTrans; + trans.pHandle = pMsg->handle; + + SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); + + SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); + +_return: + + tFreeSSchedulerHbRsp(&rsp); + taosMemoryFree(param); + + SCH_RET(code); +} + + int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) { if (!isHb) { SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); @@ -692,36 +694,6 @@ _return: SCH_RET(code); } -int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { - SSchedulerHbRsp rsp = {0}; - SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - - if (code) { - qError("hb rsp error:%s", tstrerror(code)); - SCH_ERR_JRET(code); - } - - if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) { - qError("invalid hb rsp msg, size:%d", pMsg->len); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SSchTrans trans = {0}; - trans.pTrans = pParam->pTrans; - trans.pHandle = pMsg->handle; - - SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); - - SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); - -_return: - - tFreeSSchedulerHbRsp(&rsp); - taosMemoryFree(param); - - SCH_RET(code); -} - int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) { int32_t code = 0; int32_t msgType = TDMT_SCH_LINK_BROKEN; diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index c6f5c23024..55bc600eca 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -21,56 +21,66 @@ #include "tref.h" #include "trpc.h" -SSchStatusFps gSchJobFps[JOB_TASK_STATUS_MAX] = { - {JOB_TASK_STATUS_NULL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_INIT, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_EXEC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_PART_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_FAIL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, - {JOB_TASK_STATUS_DROP, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent}, -}; - -SSchStatusFps gSchTaskFps[JOB_TASK_STATUS_MAX] = { - {JOB_TASK_STATUS_NULL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_INIT, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_EXEC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_PART_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_FAIL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, - {JOB_TASK_STATUS_DROP, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, -}; - -int32_t schSwitchJobStatus(SSchJob** job, int32_t status, void* param) { - SCH_ERR_RET(schUpdateJobStatus(*job, status)); +int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { + int32_t code = 0; + SCH_ERR_JRET(schUpdateJobStatus(pJob, status)); switch (status) { case JOB_TASK_STATUS_INIT: - SCH_RET(schInitJob(job, param)); + break; case JOB_TASK_STATUS_EXEC: - SCH_RET(schExecJob(job, param)); + SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param)); + break; case JOB_TASK_STATUS_PART_SUCC: + SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob)); + break; + case JOB_TASK_STATUS_SUCC: + break; + case JOB_TASK_STATUS_FAIL: + SCH_RET(schProcessOnJobFailure(pJob, (int32_t)param)); + break; + case JOB_TASK_STATUS_DROP: + SCH_ERR_JRET(schProcessOnJobDropped(pJob, (int32_t)param)); + + if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) { + SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId); + } else { + SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId); + } + break; default: { - SSchJob* pJob = *job; - SCH_JOB_ELOG("enter unknown job status %d", status); + SCH_JOB_ELOG("unknown job status %d", status); SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } } return TSDB_CODE_SUCCESS; + +_return: + + SCH_RET(schProcessOnJobFailure(pJob, code)); } -int32_t schHandleOpBeginEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { - SSchEvent event = {0}; - event.event = SCH_EVENT_BEGIN_OP; - SSchOpEvent opEvent = {0}; - opEvent.type = type; - opEvent.begin = true; - opEvent.pReq = pReq; +int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) { + SSchJob *pJob = schAcquireJob(jobId); + if (NULL == pJob) { + qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + *job = pJob; - SCH_ERR_RET(schHandleJobEvent(pJob, &event)); + SCH_RET(schProcessOnOpBegin(pJob, type, pReq)); } +void schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { + if (NULL == pJob) { + return; + } + + schProcessOnOpEnd(pJob, type, pReq, errCode); + schReleaseJob(pJob->refId); +} diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 0e1d749533..1f89b59137 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -102,7 +102,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ if (execId != pTask->execId) { // ignore it SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); - SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } return TSDB_CODE_SUCCESS; @@ -135,18 +135,26 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { + if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + return TSDB_CODE_SCH_IGNORE_ERROR; + } + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); + } + + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { + SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) { SCH_LOG_TASK_WAIT_TS(pTask); } else { SCH_LOG_TASK_END_TS(pTask); } - - if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status)); - SCH_RET(atomic_load_32(&pJob->errCode)); - } bool needRetry = false; bool moved = false; @@ -155,16 +163,11 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); - SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); + SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { - SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL); if (SCH_IS_WAIT_ALL_JOB(pJob)) { @@ -181,14 +184,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } } } else { - SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); + SCH_ERR_RET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; } -_return: - - SCH_RET(schProcessOnJobFailure(pJob, errCode)); + SCH_RET(code); } @@ -204,9 +205,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC); - SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); + SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask)); - SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); + SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { @@ -225,9 +226,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } if (pTask->level->taskFailed > 0) { - SCH_RET(schProcessOnJobFailure(pJob, 0)); + SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, 0)); } else { - SCH_RET(schProcessOnJobPartialSuccess(pJob)); + SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } } else { pJob->resNode = pTask->succeedAddr; @@ -235,7 +236,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - SCH_RET(schProcessOnJobPartialSuccess(pJob)); + SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } /* @@ -269,10 +270,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask)); return TSDB_CODE_SUCCESS; - -_return: - - SCH_RET(schProcessOnJobFailure(pJob, code)); } int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { @@ -280,15 +277,14 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - SCH_LOCK_TASK(pTask); if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); - schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR); + + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR)); } - SCH_UNLOCK_TASK(pTask); return TSDB_CODE_SUCCESS; } @@ -298,7 +294,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); - schProcessOnJobFailure(pJob, rspCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)rspCode); return TSDB_CODE_SUCCESS; } @@ -353,9 +349,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 _return: - code = schProcessOnTaskFailure(pJob, pTask, code); - - SCH_RET(code); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { @@ -372,9 +366,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 _return: - schProcessOnTaskFailure(pJob, pTask, code); - - SCH_RET(code); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { @@ -679,49 +671,39 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; + SSchJob *pJob = NULL; qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); for (int32_t i = 0; i < taskNum; ++i) { - STaskStatus *taskStatus = taosArrayGet(pStatusList, i); - + STaskStatus *pStatus = taosArrayGet(pStatusList, i); + int32_t code = 0; + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", - taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status)); + pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); - SSchJob *pJob = schAcquireJob(taskStatus->refId); - if (NULL == pJob) { - qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, - taskStatus->queryId, taskStatus->taskId); - // TODO DROP TASK FROM SERVER!!!! + if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { continue; } - pTask = NULL; - schGetTaskInJob(pJob, taskStatus->taskId, &pTask); - if (NULL == pTask) { - // TODO DROP TASK FROM SERVER!!!! - schReleaseJob(taskStatus->refId); + if (pStatus->execId != pTask->execId) { + //TODO + SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId); + schProcessOnCbEnd(pJob, pTask, 0); continue; } - - if (taskStatus->execId != pTask->execId) { - // TODO DROP TASK FROM SERVER!!!! - SCH_TASK_DLOG("EID %d in hb rsp mis-match", taskStatus->execId); - schReleaseJob(taskStatus->refId); - continue; - } - - if (taskStatus->status == JOB_TASK_STATUS_FAIL) { + + if (pStatus->status == JOB_TASK_STATUS_FAIL) { // RECORD AND HANDLE ERROR!!!! - schReleaseJob(taskStatus->refId); + schProcessOnCbEnd(pJob, pTask, 0); continue; } - if (taskStatus->status == JOB_TASK_STATUS_INIT) { - schRescheduleTask(pJob, pTask); + if (pStatus->status == JOB_TASK_STATUS_INIT) { + code = schRescheduleTask(pJob, pTask); } - schReleaseJob(taskStatus->refId); + schProcessOnCbEnd(pJob, pTask, code); } return TSDB_CODE_SUCCESS; @@ -739,9 +721,8 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_START_TS(pTask); if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status)); - - SCH_RET(atomic_load_32(&pJob->errCode)); + SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } // NOTE: race condition: the task should be put into the hash table before send msg to server diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 38a04d1433..36a8475a34 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -43,6 +43,8 @@ char* schGetOpStr(SCH_OP_TYPE type) { return "EXEC"; case SCH_OP_FETCH: return "FETCH"; + case SCH_OP_GET_STATUS: + return "GET STATUS"; default: return "UNKNOWN"; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 8629bdf8b9..65ab9c7659 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -73,93 +73,39 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) { int32_t code = 0; SSchJob *pJob = NULL; - *pJobId = 0; + SCH_ERR_JRET(schInitJob(pJobId, pReq)); - SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_INIT, pReq)); + SCH_ERR_JRET(schHandleOpBeginEvent(*pJobId, &pJob, SCH_OP_EXEC, pReq)); - SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_EXEC, pReq)); - - SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_EXEC, pReq)); - - SCH_ERR_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq)); + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_INIT, pReq)); - *pJobId = pJob->refId; + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_EXEC, pReq)); _return: - schDumpJobExecRes(pJob, pReq->pQueryRes); - - schReleaseJob(pJob->refId); - - return code; + SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq, code)); } -int32_t schedulerFetchRows(int64_t job, SSchedulerReq *pReq) { +int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) { qDebug("scheduler %s fetch rows start", pReq->syncReq ? "SYNC" : "ASYNC"); int32_t code = 0; - SSchJob *pJob = schAcquireJob(job); - if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_FETCH, pReq)); - - - - SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true)); - - pJob->userRes.fetchRes = pData; - code = schJobFetchRows(pJob); - - schReleaseJob(job); - - SCH_RET(code); -} - -void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param) { - qDebug("scheduler async fetch rows start"); + SSchJob *pJob = NULL; - int32_t code = 0; - if (NULL == fp || NULL == param) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } + SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq)); - SSchJob *pJob = schAcquireJob(job); - if (NULL == pJob) { - qError("acquire sch job from job list failed, may be dropped, jobId:0x%" PRIx64, job); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_FETCH, false)); - - pJob->userRes.fetchFp = fp; - pJob->userRes.userParam = param; - - SCH_ERR_JRET(schJobFetchRowsA(pJob)); + SCH_ERR_JRET(schJobFetchRows(pJob)); _return: - if (code) { - fp(NULL, param, code); - } - - schReleaseJob(job); + SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_FETCH, pReq, code)); } -int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { +int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) { int32_t code = 0; - SSchJob *pJob = schAcquireJob(job); - if (NULL == pJob) { - qDebug("acquire job from jobRef list failed, may not started or dropped, refId:0x%" PRIx64, job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + SSchJob *pJob = NULL; - if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) { - qDebug("job not initialized or not executable job, refId:0x%" PRIx64, job); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } + SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_GET_STATUS, NULL)); for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -176,23 +122,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { _return: - schReleaseJob(job); - - SCH_RET(code); -} - -int32_t scheduleCancelJob(int64_t job) { - SSchJob *pJob = schAcquireJob(job); - if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - int32_t code = schCancelJob(pJob); - - schReleaseJob(job); - - SCH_RET(code); + SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_GET_STATUS, NULL, code)); } void schedulerStopQueryHb(void *pTrans) { @@ -203,33 +133,23 @@ void schedulerStopQueryHb(void *pTrans) { schCleanClusterHb(pTrans); } -void schedulerFreeJob(int64_t* job, int32_t errCode) { - if (0 == *job) { +void schedulerFreeJob(int64_t* jobId, int32_t errCode) { + if (0 == *jobId) { return; } - - SSchJob *pJob = schAcquireJob(*job); + + SSchJob *pJob = schAcquireJob(*jobId); if (NULL == pJob) { - qError("acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *job); - *job = 0; + qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; } - int32_t code = schProcessOnJobDropped(pJob, errCode); - if (TSDB_CODE_SCH_JOB_IS_DROPPING == code) { - SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, *job); - *job = 0; + if (schJobDone(pJob)) { return; } - SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, *job); - - if (taosRemoveRef(schMgmt.jobRef, *job)) { - SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, *job); - } - - schReleaseJob(*job); - *job = 0; + schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)errCode); + *jobId = 0; } void schedulerDestroy(void) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 245d8d362c..1a464b78ab 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -513,7 +513,7 @@ void* schtRunJobThread(void *aa) { req.pDag = &dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; - req.execParam = &queryDone; + req.cbParam = &queryDone; code = schedulerExecJob(&req, &queryJobRefId); assert(code == 0); @@ -665,7 +665,7 @@ TEST(queryTest, normalCase) { req.pDag = &dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; - req.execParam = &queryDone; + req.cbParam = &queryDone; code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -769,7 +769,7 @@ TEST(queryTest, readyFirstCase) { req.pDag = &dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; - req.execParam = &queryDone; + req.cbParam = &queryDone; code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -876,7 +876,7 @@ TEST(queryTest, flowCtrlCase) { req.pDag = &dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; - req.execParam = &queryDone; + req.cbParam = &queryDone; code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -989,7 +989,7 @@ TEST(insertTest, normalCase) { req.pDag = &dag; req.sql = "insert into tb values(now,1)"; req.execFp = schtQueryCb; - req.execParam = NULL; + req.cbParam = NULL; code = schedulerExecJob(&req, &insertJobRefId, &res); ASSERT_EQ(code, 0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6e8b8b1595..e867af86af 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -393,6 +393,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in in/notin operator") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place") -- GitLab