From fae4f2c4ed96427cfefa25c6609455ed804d1e46 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Jan 2023 18:08:34 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 3 ++- source/libs/executor/src/exchangeoperator.c | 30 ++++++--------------- source/libs/executor/src/executil.c | 4 +++ source/libs/executor/src/projectoperator.c | 29 +++----------------- source/libs/executor/src/scanoperator.c | 16 +++++------ source/libs/executor/src/sortoperator.c | 28 +++++-------------- 6 files changed, 32 insertions(+), 78 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c68f7c4697..4ae178d508 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SDiskbasedBuf* pBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 037b33dc9f..08b7d371e2 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -707,6 +707,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { } int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.id.groupId; @@ -750,36 +752,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // set current group id pLimitInfo->currentGroupId = pBlock->info.id.groupId; - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // current group limitation is reached, and future blocks of this group need to be discarded. - if (pBlock->info.rows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } + return PROJECT_RETRIEVE_DONE; } - - return PROJECT_RETRIEVE_DONE; } // todo optimize performance // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // they may not belong to the same group the limit/offset value is not valid in this case. - if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 || - pLimitInfo->slimit.limit != -1) { + if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) { return PROJECT_RETRIEVE_DONE; } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 757324a773..92d52fbb0a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1749,6 +1749,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { pLimitInfo->slimit.offset != -1); } +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) { + return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1); +} + void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3e3610827b..b1dc217bf5 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -185,36 +185,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS SOperatorInfo* pOperator) { // set current group id pLimitInfo->currentGroupId = groupId; - - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - - // TODO: optimize it later when partition by + limit - // all retrieved requirement has been fulfilled, let's finish this - if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || - (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data - // from next group. So let's continue this retrieve process - if (keepRows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } } } - pLimitInfo->numOfOutputRows += pBlock->info.rows; return PROJECT_RETRIEVE_DONE; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 813763fffa..2813ef3505 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo } } -// todo handle the slimit info -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SLimit* pLimit = &pLimitInfo->limit; const char* id = GET_TASKID(pTaskInfo); - if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { + if (pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; blockDataEmpty(pBlock); @@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) { // limit the output rows int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keep); + + pLimitInfo->numOfOutputRows += pBlock->info.rows; qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); return true; } + pLimitInfo->numOfOutputRows += pBlock->info.rows; return false; } @@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca } } - bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo); if (limitReached) { // set operator flag is done setOperatorCompleted(pOperator); } pCost->totalRows += pBlock->info.rows; - pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; return TSDB_CODE_SUCCESS; } @@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator); - pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows; - + applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 97b4fd9dc4..e91d41897d 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } + // multi-group case not handle here SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -236,28 +237,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { continue; } - // todo add the limit/offset info - if (pInfo->limitInfo.remainOffset > 0) { - if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { - pInfo->limitInfo.remainOffset -= pBlock->info.rows; - continue; - } - - blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); - pInfo->limitInfo.remainOffset = 0; - } - - if (pInfo->limitInfo.limit.limit > 0 && - pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { - int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; - blockDataKeepFirstNRows(pBlock, remain); + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo); + if (limitReached) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); } - size_t numOfRows = blockDataGetNumOfRows(pBlock); - pInfo->limitInfo.numOfOutputRows += numOfRows; - pOperator->resultInfo.totalRows += numOfRows; - - if (numOfRows > 0) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + if (pBlock->info.rows > 0) { break; } } @@ -680,7 +666,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData break; } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); } -- GitLab