diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index b3f35025d1a547353a49903a8e02b7b46a5e9aa4..dcd058a293f0a35080335b30b38e32a792c43a74 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3ccecb0d364b5b713ed9f2f6b7b4787ef274cc1..7693d26d3fd450ed6e120fcf06d14dbc41e2f8a5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, - pRequest->metric.start, NULL != pRes, &res); + pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a90fb7fc2e882b0a65feb87940874269d007564a..5a6fcee759d86ce85c9d67eb28cb5950401b9857 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,6 +39,12 @@ enum { SCH_WRITE, }; +typedef enum { + SCH_RES_TYPE_QUERY, + SCH_RES_TYPE_FETCH, +} SCH_RES_TYPE; + + typedef struct SSchTrans { void *transInst; void *transHandle; @@ -159,7 +165,6 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; - bool needRes; bool syncSchedule; bool queryJob; bool needFlowCtrl; @@ -192,6 +197,7 @@ typedef struct SSchJob { int32_t errCode; SArray *errList; // SArray SRWLatch resLock; + SCH_RES_TYPE resType; void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index bdf674ab9385158487a1cb55a0db1587bb0c9896..9354c1a875d030c1839484db967ad091d7fdf8a9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * } int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool needRes, bool syncSchedule) { + int64_t startTs, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -81,7 +81,6 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; - pJob->attr.needRes = needRes; pJob->transport = transport; pJob->sql = sql; @@ -1059,6 +1058,8 @@ _return: int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); + pJob->resType = SCH_RES_TYPE_FETCH; + atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_ptr(&pJob->resData, pRsp); @@ -1179,23 +1180,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); - if (pJob->attr.needRes) { - SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->resData) { - SSubmitRsp *sum = pJob->resData; - sum->affectedRows += rsp->affectedRows; - sum->nBlocks += rsp->nBlocks; - sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); - memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); - taosMemoryFree(rsp->pBlocks); - taosMemoryFree(rsp); - } else { - pJob->resData = rsp; - } - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + pJob->resType = SCH_RES_TYPE_QUERY; + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->resData) { + SSubmitRsp *sum = pJob->resData; + sum->affectedRows += rsp->affectedRows; + sum->nBlocks += rsp->nBlocks; + sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); + memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + taosMemoryFree(rsp->pBlocks); + taosMemoryFree(rsp); } else { - tFreeSSubmitRsp(rsp); + pJob->resData = rsp; } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -2412,7 +2410,7 @@ void schFreeJobImpl(void *job) { } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool needRes, bool syncSchedule) { + int64_t startTs, bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -2421,7 +2419,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule)); + SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule)); SCH_ERR_JRET(schLaunchJob(pJob)); @@ -2463,6 +2461,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); + pJob->resType = SCH_RES_TYPE_FETCH; + int64_t refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); @@ -2535,7 +2535,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, bool needRes, SQueryResult *pRes) { + int64_t startTs, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -2543,14 +2543,14 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); } SSchJob *job = schAcquireJob(*pJob); pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; - if (needRes) { + if (SCH_RES_TYPE_QUERY == job->resType) { pRes->res = job->resData; job->resData = NULL; } @@ -2568,7 +2568,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false)); + SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 09ecd9fffd013c18762e3de6cf36a51f99ced1ce..fc0e05aaf106fb11d8daa9be9a55e510aac58ff5 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -985,7 +985,7 @@ TEST(insertTest, normalCase) { taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); SQueryResult res = {0}; - code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20);