From 927478bff65666e5672b36f57a93252d9f0e9bdb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 26 Sep 2022 15:55:26 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 3 ++- source/libs/executor/src/executor.c | 3 ++- source/libs/qworker/src/qworker.c | 36 +++++++++++++++----------- 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8c1d957381..81882ca1ac 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8d1525e081..aad141a404 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -690,7 +690,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); + bool hasMore = false; + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 373cb451f4..0fbbc25873 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -479,7 +479,7 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); @@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL } } + *hasMore = (pRes != NULL); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f06dbd5f0d..c3fa9d8905 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); // if *taskHandle is NULL, it's killed right now + bool hasMore = false; + if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); + + code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ++execNum; - if (taosArrayGetSize(pResList) == 0) { - QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); - dsEndPut(sinkHandle, useconds); - - QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - - if (queryStop) { - *queryStop = true; - } - - break; - } - - for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) { + size_t numOfResBlock = taosArrayGetSize(pResList); + for (int32_t j = 0; j < numOfResBlock; ++j) { SSDataBlock *pRes = taosArrayGetP(pResList, j); ASSERT(pRes->info.rows > 0); @@ -149,6 +140,21 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); } + if (numOfResBlock == 0 || (hasMore == false)) { + if (numOfResBlock == 0) { + QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); + } + + dsEndPut(sinkHandle, useconds); + QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); + + if (queryStop) { + *queryStop = true; + } + + break; + } + if (!qcontinue) { if (queryStop) { *queryStop = true; -- GitLab