diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d7c283c70dde2b3264270b39466482d709054dca..9142af4a6d188d4366728e35c4e44cbaaad0a669 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -744,8 +744,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - - SNode* pCondition; + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -785,7 +785,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index c46485a332e0b4614d2ab0d5aa82bd21852d978d..56a5e253d87942115487ee7b51b00f3700822f20 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead STableListInfo* pTableList = &pTaskInfo->tableqinfoList; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0036a3f0804fa5cf95dff960e1f8a59dc625d496..df039bc0e2c2ff1987232a7ab6d86a63e7b82064 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3601,13 +3601,13 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf return TSDB_CODE_SUCCESS; } -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) { ASSERT(numOfRows != 0); - pOperator->resultInfo.capacity = numOfRows; - pOperator->resultInfo.threshold = numOfRows * 0.75; + pResultInfo->capacity = numOfRows; + pResultInfo->threshold = numOfRows * 0.75; - if (pOperator->resultInfo.threshold == 0) { - pOperator->resultInfo.threshold = numOfRows; + if (pResultInfo->threshold == 0) { + pResultInfo->threshold = numOfRows; } } @@ -3670,7 +3670,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1024; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3825,7 +3825,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys if (numOfRows * pResBlock->info.rowSize > TWOMB) { numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4003,7 +4003,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4080,7 +4080,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2964948e70af588ec6284e31c37278ec802f3981..20630fd6ff2f246bc72ec82f6b659134fe394863 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -406,7 +406,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index b864fae47f236955fd89a2ce60fd4b6d3d3f65e3..497de8347cb21de975a3d767e4b42cf236795761 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; pOperator->name = "MergeJoinOperator"; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dc44de9cf323258f180dde61c3119c9f37e0b9b7..1f77dfb0937444bd11eea5f858ae1cddd695af5b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2299,7 +2299,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->pCondition = pScanNode->node.pConditions; pInfo->scanCols = colList; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); @@ -2529,7 +2529,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = @@ -3073,7 +3073,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a3b79d959790c68ce8865496d5998d1e518d0b53..f019ee94b854acce88fa824e28118dcd73dc3399 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -26,7 +26,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -41,13 +41,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - pInfo->binfo.pRes = pResBlock; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); - pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + pInfo->binfo.pRes = pResBlock; + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; + initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; @@ -208,26 +210,44 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); - if (pBlock != NULL) { - doFilter(pInfo->pCondition, pBlock); - } - + pInfo->pColMatchInfo, pInfo); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); - break; + return NULL; } - if (blockDataGetNumOfRows(pBlock) > 0) { - break; + doFilter(pInfo->pCondition, pBlock); + if (blockDataGetNumOfRows(pBlock) == 0) { + continue; } - } - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; + // todo add the limit/offset info + if (pInfo->limitInfo.remainOffset > 0) { + if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { + pInfo->limitInfo.remainOffset -= pBlock->info.rows; + continue; + } + + blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); + pInfo->limitInfo.remainOffset = 0; + } + + if (pInfo->limitInfo.limit.limit > 0 && + pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { + int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; + blockDataKeepFirstNRows(pBlock, remain); + } + + size_t numOfRows = blockDataGetNumOfRows(pBlock); + pInfo->limitInfo.numOfOutputRows += numOfRows; + pOperator->resultInfo.totalRows += numOfRows; + + if (numOfRows > 0) { + break; + } } - return pBlock; + return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL; } void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { @@ -479,7 +499,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); ; @@ -711,7 +731,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->binfo.pRes = pResBlock; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3933ded7a9a8b67a3359ffd9b1205113414dbd7c..042f807271a7898062918d1aa9526f967923630a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1683,7 +1683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExprSupp* pSup = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1764,7 +1764,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); @@ -2224,7 +2224,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -2272,7 +2272,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -2320,7 +2320,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo } size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -2896,7 +2896,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); @@ -3072,7 +3072,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); @@ -4336,7 +4336,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); pInfo->stateCol = extractColumnFromColumnNode(pColNode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); @@ -4586,7 +4586,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -4892,7 +4892,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI SExprSupp* pExprSupp = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&iaInfo->binfo, pResBlock);