diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4f02c559b1cc9adee4f54deb352517cd686bac4a..ce1ae318dc1be1c6f27071e1139d3f5e48e5707d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -700,6 +700,10 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + + STupleHandle *prefetchedTuple; + bool hasGroupId; + uint64_t groupId; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -759,7 +763,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode); -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo); SSDataBlock* loadNextDataBlock(void* param); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1e57de84381a043ea6669bb74e29194ef3e55d9e..dcaa95e28a3d438a6ca7638bc6d925b59dd0aeb8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -42,6 +42,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; @@ -81,8 +83,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { pBlock->info.rows += 1; } -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, + SSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -93,14 +95,33 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + if (pTupleHandle == NULL) { break; } - appendOneRowToDataBlock(p, pTupleHandle); + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + if (p->info.rows >= capacity) { - return pDataBlock; + break; } } @@ -117,6 +138,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i pDataBlock->info.rows = p->info.rows; pDataBlock->info.capacity = p->info.rows; + pDataBlock->info.groupId = pInfo->groupId; } blockDataDestroy(p); @@ -188,8 +210,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = - getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); + SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -230,11 +252,11 @@ typedef struct SMultiwaySortMergeOperatorInfo { SArray* pColMatchInfo; // for index map from table scan output SSDataBlock* pInputBlock; - int64_t startTs; // sort start time + int64_t startTs; // sort start time - bool hasGroupId; - uint64_t groupId; - STupleHandle *prefetchedTuple; + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; } SMultiwaySortMergeOperatorInfo; int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { @@ -274,7 +296,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { + SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -285,7 +307,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; if (pInfo->prefetchedTuple == NULL) { pTupleHandle = tsortNextTuple(pHandle); @@ -314,7 +335,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData if (p->info.rows >= capacity) { break; } - } if (p->info.rows > 0) { @@ -337,7 +357,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } - SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -351,12 +370,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = - getMultiwaySortedBlockData(pInfo->pSortHandle, - pInfo->binfo.pRes, - pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, - pInfo); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, + pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -367,7 +382,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { - SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; + SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); @@ -387,9 +402,9 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, - SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, + SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, + SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); int32_t rowSize = pResBlock->info.rowSize; @@ -413,7 +428,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; pInfo->sortBufSize = pInfo->bufPageSize * 16; - + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,