From 3ffd97591714ac78c75faaa71d934ffef96dfe96 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 4 Jul 2022 09:08:57 +0800 Subject: [PATCH] enh: refactor scheduler code --- include/libs/qcom/query.h | 6 +--- include/libs/scheduler/scheduler.h | 8 +++-- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 6 +++- source/client/src/clientMsgHandler.c | 2 +- source/libs/qcom/src/queryUtil.c | 2 +- source/libs/scheduler/inc/schInt.h | 4 +-- source/libs/scheduler/src/schJob.c | 42 ++++++--------------------- source/libs/scheduler/src/schStatus.c | 30 +++++++++++++++++-- source/libs/scheduler/src/schUtil.c | 14 +++++++++ source/libs/scheduler/src/scheduler.c | 37 +++++++++++------------ 11 files changed, 84 insertions(+), 69 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 64196aa64f..670e21fc4a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -60,10 +60,6 @@ typedef struct STableComInfo { int32_t rowSize; // row size of the schema } STableComInfo; -typedef struct SQueryExecRes { - int32_t msgType; - void* res; -} SQueryExecRes; typedef struct SIndexMeta { #ifdef WINDOWS @@ -211,7 +207,7 @@ char* jobTaskStatusStr(int32_t status); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); -void destroyQueryExecRes(SQueryExecRes* pRes); +void destroyQueryExecRes(SExecResult* pRes); int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len); char* parseTagDatatoJson(void* p); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 66e1f7ed3a..5f9f65d76a 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -53,11 +53,12 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SQueryResult { +typedef struct SExecResult { int32_t code; uint64_t numOfRows; - SQueryExecRes res; -} SQueryResult; + int32_t msgType; + void* res; +} SExecResult; typedef struct STaskInfo { SQueryNodeAddr addr; @@ -85,6 +86,7 @@ typedef struct SSchedulerReq { schedulerChkKillFp chkKillFp; void* chkKillParam; SQueryResult* pQueryRes; + char** pFetchRes; } SSchedulerReq; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 737fee5125..9d2886d242 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -156,7 +156,7 @@ typedef struct SResultColumn { } SResultColumn; typedef struct SReqResultInfo { - SQueryExecRes execRes; + SExecResult execRes; const char* pRspMsg; const char* pData; TAOS_FIELD* fields; // todo, column names are not needed. diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 63b153b6fc..0e031bd24f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -757,7 +757,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { } SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); - SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; + SExecResult* pRes = &pRequest->body.resInfo.execRes; switch (pRes->msgType) { case TDMT_VND_ALTER_TABLE: @@ -1366,6 +1366,10 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) } SReqResultInfo* pResInfo = &pRequest->body.resInfo; + SSchedulerReq req = { + .syncReq = true, + . + }; pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 761eebee42..dcccbb17c9 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -266,7 +266,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { } if (pRequest->body.queryFp != NULL) { - SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; + SExecResult* pRes = &pRequest->body.resInfo.execRes; if (code == TSDB_CODE_SUCCESS) { SCatalog* pCatalog = NULL; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 1db13dd931..923224688c 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -200,7 +200,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam return s; } -void destroyQueryExecRes(SQueryExecRes* pRes) { +void destroyQueryExecRes(SExecResult* pRes) { if (NULL == pRes || NULL == pRes->res) { return; } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 74b4dcf076..cceea452db 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -260,7 +260,7 @@ typedef struct SSchJob { SSchTask *fetchTask; int32_t errCode; SRWLatch resLock; - SQueryExecRes execRes; + SExecResult execRes; void *resData; //TODO free it or not int32_t resNumOfRows; SSchResInfo userRes; @@ -415,7 +415,7 @@ char* schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(SSchJob **pJob, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); -int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes); +int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet); int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 893a836529..9f1679f5b2 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -21,11 +21,6 @@ #include "tref.h" #include "trpc.h" -FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("sch acquire jobId:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } - -FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } - - void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { if (TSDB_CODE_SUCCESS == errCode) { return; @@ -365,7 +360,7 @@ _return: } -int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { +int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; pRes->res = pJob->execRes; @@ -374,7 +369,7 @@ int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { return TSDB_CODE_SUCCESS; } -int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) { +int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { int32_t code = 0; if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCC)); @@ -407,14 +402,14 @@ int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) { int32_t schNotifyUserExecRes(SSchJob* pJob) { SQueryResult* pRes = taosMemoryCalloc(1, sizeof(SQueryResult)); if (pRes) { - schSetJobQueryRes(pJob, pRes); + schDumpJobExecRes(pJob, pRes); } schEndOperation(pJob); SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); (*pJob->userRes.execFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); - SCH_JOB_DLOG("sch end from query cb, code: %s", tstrerror(pJob->errCode)); + SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode)); return TSDB_CODE_SUCCESS; } @@ -422,7 +417,7 @@ int32_t schNotifyUserExecRes(SSchJob* pJob) { int32_t schNotifyUserFetchRes(SSchJob* pJob) { void* pRes = NULL; - schSetJobFetchRes(pJob, &pRes); + schDumpJobFetchRes(pJob, &pRes); schEndOperation(pJob); @@ -473,7 +468,8 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod // Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { - SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode)); + schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode); + return TSDB_CODE_SUCCESS; } // Note: no more error processing, handled in function internal @@ -663,7 +659,7 @@ int32_t schJobFetchRows(SSchJob *pJob) { tsem_wait(&pJob->rspSem); } - SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes)); + SCH_ERR_JRET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); _return: @@ -850,27 +846,7 @@ _return: SCH_RET(code); } - -int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) { - SCH_ERR_RET(schUpdateJobStatus(*job, status)); - - switch (status) { - case JOB_TASK_STATUS_INIT: - SCH_RET(schInitJob(job, param)); - case JOB_TASK_STATUS_EXEC: - SCH_RET(schExecJob(job, param)); - case JOB_TASK_STATUS_PART_SUCC: - default: { - SSchJob* pJob = *job; - SCH_JOB_ELOG("enter unknown job status %d", status); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - } - - return TSDB_CODE_SUCCESS; -} - -int32_t schJobHandleEvent(SSchJob* pJob, SSchEvent* pEvent) { +int32_t schHandleJobEvent(SSchJob* pJob, SSchEvent* pEvent) { switch (pEvent->event) { case SCH_EVENT_BEGIN_OP: schProcessOnOpBegin(pJob, pEvent); diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index 1e5be8c3de..c6f5c23024 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -41,8 +41,34 @@ SSchStatusFps gSchTaskFps[JOB_TASK_STATUS_MAX] = { {JOB_TASK_STATUS_DROP, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent}, }; -int32_t schSwitchJobStatus(int32_t status, SSchJob* pJob, void* pParam) { - schJobStatusEnter(pJob, status, pParam); +int32_t schSwitchJobStatus(SSchJob** job, int32_t status, void* param) { + SCH_ERR_RET(schUpdateJobStatus(*job, status)); + + switch (status) { + case JOB_TASK_STATUS_INIT: + SCH_RET(schInitJob(job, param)); + case JOB_TASK_STATUS_EXEC: + SCH_RET(schExecJob(job, param)); + case JOB_TASK_STATUS_PART_SUCC: + default: { + SSchJob* pJob = *job; + SCH_JOB_ELOG("enter unknown job status %d", status); + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schHandleOpBeginEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { + SSchEvent event = {0}; + event.event = SCH_EVENT_BEGIN_OP; + SSchOpEvent opEvent = {0}; + opEvent.type = type; + opEvent.begin = true; + opEvent.pReq = pReq; + + SCH_ERR_RET(schHandleJobEvent(pJob, &event)); } diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index f0ff12b56b..38a04d1433 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -21,6 +21,20 @@ #include "tref.h" #include "trpc.h" +FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { + qDebug("sch acquire jobId:0x%"PRIx64, refId); + return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); +} + +FORCE_INLINE int32_t schReleaseJob(int64_t refId) { + if (0 == refId) { + return TSDB_CODE_SUCCESS; + } + + qDebug("sch release jobId:0x%"PRIx64, refId); + return taosReleaseRef(schMgmt.jobRef, refId); +} + char* schGetOpStr(SCH_OP_TYPE type) { switch (type) { case SCH_OP_NULL: diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f797f6c7e..8629bdf8b9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -73,36 +73,29 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) { int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_INIT, pReq)); - - SSchEvent event = {0}; - event.event = SCH_EVENT_BEGIN_OP; - SSchOpEvent opEvent = {0}; - opEvent.type = SCH_OP_EXEC; - opEvent.begin = true; - opEvent.pReq = pReq; - schJobHandleEvent(pJob, &event); + *pJobId = 0; + + SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_INIT, pReq)); + + SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_EXEC, pReq)); - SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_EXEC, pReq)); + SCH_ERR_RET(schSwitchJobStatus(&pJob, JOB_TASK_STATUS_EXEC, pReq)); + + SCH_ERR_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq)); *pJobId = pJob->refId; _return: - if (pJob) { - schSetJobQueryRes(pJob, pReq->pQueryRes); - schReleaseJob(pJob->refId); - } + schDumpJobExecRes(pJob, pReq->pQueryRes); + + schReleaseJob(pJob->refId); return code; } -int32_t schedulerFetchRows(int64_t job, void **pData) { - qDebug("scheduler sync fetch rows start"); - - if (NULL == pData) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } +int32_t schedulerFetchRows(int64_t job, SSchedulerReq *pReq) { + qDebug("scheduler %s fetch rows start", pReq->syncReq ? "SYNC" : "ASYNC"); int32_t code = 0; SSchJob *pJob = schAcquireJob(job); @@ -111,6 +104,10 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } + SCH_ERR_RET(schHandleOpBeginEvent(pJob, SCH_OP_FETCH, pReq)); + + + SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true)); pJob->userRes.fetchRes = pData; -- GitLab