From 97da2ee95840ec74f676f7d90ade97d4a2543ae6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 10:00:19 +0800 Subject: [PATCH] modify api and compile warning --- include/libs/scheduler/scheduler.h | 4 +- source/libs/scheduler/src/scheduler.c | 129 +++++++++++++++----------- 2 files changed, 79 insertions(+), 54 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c5f3cd8f0f..c01b79d7ff 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -58,7 +58,7 @@ typedef struct SQueryProfileSummary { */ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); +int32_t scheduleFetchRows(void *pJob, void **data); /** @@ -66,7 +66,7 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); * @param pJob * @return */ -int32_t scheduleCancelJob(void *pRpc, void *pJob); +int32_t scheduleCancelJob(void *pJob); void scheduleFreeJob(void *pJob); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e68be25bce..2327fc5b04 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -127,6 +127,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; + job->queryId = dag->queryId; + if (dag->numOfSubplans <= 0) { qError("invalid subplan num:%d", dag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -152,7 +154,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { job->levelNum = levelNum; job->levelIdx = levelNum - 1; - job->status = SCH_STATUS_NOT_START; job->subPlans = dag->pSubplans; @@ -341,50 +342,6 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { - int32_t code = 0; - - switch (msgType) { - case TSDB_MSG_TYPE_QUERY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); - if (code) { - goto _task_error; - } - } - break; - case TSDB_MSG_TYPE_RSP_READY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } - } - break; - case TSDB_MSG_TYPE_FETCH: - SCH_ERR_JRET(rspCode); - SCH_ERR_JRET(schProcessOnDataFetched(job)); - break; - default: - qError("unknown msg type:%d received", msgType); - return TSDB_CODE_QRY_INVALID_INPUT; - } - - return TSDB_CODE_SUCCESS; - -_task_error: - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); - return TSDB_CODE_SUCCESS; - -_return: - code = schProcessOnJobFailure(job); - return code; -} - int32_t schFetchFromRemote(SQueryJob *job) { int32_t code = 0; @@ -500,6 +457,50 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } +int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + + switch (msgType) { + case TSDB_MSG_TYPE_QUERY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_RSP_READY: + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + case TSDB_MSG_TYPE_FETCH: + SCH_ERR_JRET(rspCode); + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; + default: + qError("unknown msg type:%d received", msgType); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + return TSDB_CODE_SUCCESS; + +_task_error: + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); + return TSDB_CODE_SUCCESS; + +_return: + code = schProcessOnJobFailure(job); + return code; +} + @@ -578,10 +579,17 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM tsem_init(&job->rspSem, 0, 0); + if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { + qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + job->status = SCH_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(job)); *(SQueryJob **)pJob = job; - + return TSDB_CODE_SUCCESS; _return: @@ -592,8 +600,8 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { - if (NULL == pRpc || NULL == pJob || NULL == data) { +int32_t scheduleFetchRows(void *pJob, void **data) { + if (NULL == pJob || NULL == data) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -620,14 +628,31 @@ _return: return code; } -int32_t scheduleCancelJob(void *pRpc, void *pJob); +int32_t scheduleCancelJob(void *pJob) { + //TODO -void scheduleFreeJob(void *job) { - if (NULL == job) { + return TSDB_CODE_SUCCESS; +} + +void scheduleFreeJob(void *pJob) { + if (NULL == pJob) { return; } - //TODO + SQueryJob *job = pJob; + + if (job->status > 0) { + if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { + qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed + return; + } + + if (job->status == SCH_STATUS_EXECUTING) { + scheduleCancelJob(pJob); + } + } + + //TODO free job } void schedulerDestroy(void) { -- GitLab