From 3d5073e7854a396a0210fd1f834b966e63f6c712 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 20 May 2022 19:25:46 +0800 Subject: [PATCH] fix res issue --- source/libs/scheduler/inc/schedulerInt.h | 8 +----- source/libs/scheduler/src/scheduler.c | 32 +++++++++++------------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5a6fcee759..be92de774b 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,12 +39,6 @@ enum { SCH_WRITE, }; -typedef enum { - SCH_RES_TYPE_QUERY, - SCH_RES_TYPE_FETCH, -} SCH_RES_TYPE; - - typedef struct SSchTrans { void *transInst; void *transHandle; @@ -197,7 +191,7 @@ typedef struct SSchJob { int32_t errCode; SArray *errList; // SArray SRWLatch resLock; - SCH_RES_TYPE resType; + void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 899748ec45..dcd87557aa 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1058,8 +1058,6 @@ _return: int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - pJob->resType = SCH_RES_TYPE_FETCH; - atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_ptr(&pJob->resData, pRsp); @@ -1072,9 +1070,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) { if (rsp->tbFName[0]) { - if (NULL == pJob->resData) { - pJob->resData = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); - if (NULL == pJob->resData) { + if (NULL == pJob->queryRes) { + pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); + if (NULL == pJob->queryRes) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } @@ -1084,7 +1082,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) { tbInfo.sversion = rsp->sversion; tbInfo.tversion = rsp->tversion; - taosArrayPush((SArray *)pJob->resData, &tbInfo); + taosArrayPush((SArray *)pJob->queryRes, &tbInfo); } return TSDB_CODE_SUCCESS; @@ -1201,10 +1199,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); - pJob->resType = SCH_RES_TYPE_QUERY; SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->resData) { - SSubmitRsp *sum = pJob->resData; + if (pJob->queryRes) { + SSubmitRsp *sum = pJob->queryRes; sum->affectedRows += rsp->affectedRows; sum->nBlocks += rsp->nBlocks; sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); @@ -1212,7 +1209,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); } else { - pJob->resData = rsp; + pJob->queryRes = rsp; } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } @@ -1246,7 +1243,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_JRET(rsp->code); - pJob->resType = SCH_RES_TYPE_QUERY; SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); @@ -2424,6 +2420,12 @@ void schFreeJobImpl(void *job) { qExplainFreeCtx(pJob->explainCtx); + if (SCH_IS_QUERY_JOB(pJob)) { + taosArrayDestroy((SArray *)pJob->queryRes); + } else { + tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); + } + taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -2486,8 +2488,6 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); - pJob->resType = SCH_RES_TYPE_FETCH; - int64_t refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); @@ -2582,10 +2582,8 @@ _return: pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; - if (SCH_RES_TYPE_QUERY == job->resType) { - pRes->res = job->resData; - job->resData = NULL; - } + pRes->res = job->queryRes; + job->queryRes = NULL; schReleaseJob(*pJob); } -- GitLab