From 376e7ea5b9831dd5261d7897b3e314d0bb7b61d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Oct 2022 19:23:05 +0800 Subject: [PATCH] fix(query): support limit/offset in merge sort operator. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/scanoperator.c | 7 ++++--- source/libs/executor/src/sortoperator.c | 18 ++++++++++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 297b8501b2..a22b594726 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -899,6 +899,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); +void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3b65c42abc..0e96b00cee 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -355,8 +355,8 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo } } -static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, - SOperatorInfo* pOperator) { +// todo handle the slimit info +void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { SLimit* pLimit = &pLimitInfo->limit; if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { @@ -377,7 +377,8 @@ static void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecT blockDataKeepFirstNRows(pBlock, keep); qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); - setTaskStatus(pTaskInfo, TASK_COMPLETED); + +// setTaskStatus(pTaskInfo, TASK_COMPLETED); pOperator->status = OP_EXEC_DONE; } } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 04f86d90d5..16d853ac7e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -531,7 +531,7 @@ typedef struct SMultiwayMergeOperatorInfo { SOptrBasicInfo binfo; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort - + SLimitInfo limitInfo; SArray* pSortInfo; SSortHandle* pSortHandle; SColMatchInfo matchInfo; @@ -592,6 +592,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); + _retry: while (1) { STupleHandle* pTupleHandle = NULL; if (pInfo->groupSort) { @@ -626,14 +627,22 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } else { appendOneRowToDataBlock(p, pTupleHandle); } + if (p->info.rows >= capacity) { break; } } + if (pInfo->groupSort) { pInfo->hasGroupId = false; } + if (p->info.rows > 0) { // todo extract method + applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); + if (p->info.rows == 0) { + goto _retry; + } + blockDataEnsureCapacity(pDataBlock, p->info.rows); int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { @@ -650,9 +659,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } blockDataDestroy(p); - qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId, pDataBlock->info.rows); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } @@ -717,6 +726,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size goto _error; } + initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createResDataBlock(pDescNode); int32_t rowSize = pInfo->binfo.pRes->info.rowSize; ASSERT(rowSize < 100 * 1024 * 1024); @@ -725,6 +735,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size int32_t numOfOutputCols = 0; code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(&pOperator->resultInfo, 1024); -- GitLab