From 24cdf0e7585ab94cf30ea37cf831d15de1f37874 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Feb 2023 13:40:51 +0800 Subject: [PATCH] enh(query): add check for limit reach status. --- source/libs/executor/src/projectoperator.c | 1 + source/libs/executor/src/scanoperator.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index d641810cee..4a3a3f4de4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -190,6 +190,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S return PROJECT_RETRIEVE_DONE; } +// todo refactor static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock, SOperatorInfo* pOperator) { // set current group id diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 169693b8c4..b939ccdcab 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -617,6 +617,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } + if (pOperator->status == OP_EXEC_DONE) { + break; + } + // process this data block based on the probabilities bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); if (!processThisBlock) { @@ -628,9 +632,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { uint32_t status = 0; int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // current block is filter out according to filter condition, continue load the next block @@ -2540,7 +2543,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -2714,10 +2717,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, - pInfo->limitInfo.numOfOutputRows); + pInfo->limitInfo.numOfOutputRows); + if (limitReached) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } return (pResBlock->info.rows > 0) ? pResBlock : NULL; } -- GitLab