From 9afcca0355ade10397e9f72694faca70a3a60d70 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 9 Jun 2022 09:36:14 +0800 Subject: [PATCH] fix: add group id to create initial source --- source/common/src/tdatablock.c | 1 + source/libs/executor/src/tsort.c | 59 ++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 704a319512..65b0e25275 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1217,6 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.rowSize = pDataBlock->info.rowSize; + pBlock->info.groupId = pDataBlock->info.groupId; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8fb1d1f302..9448ef15b3 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -526,16 +526,24 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->type == SORT_SINGLESOURCE_SORT) { SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); taosArrayClear(pHandle->pOrderedSource); - + + bool hasGroupId = false; + SSDataBlock* prefetchedDataBlock = NULL; + while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(source->param); + SSDataBlock* pBlock = NULL; + if (prefetchedDataBlock == NULL) { + pBlock = pHandle->fetchfp(source->param); + } else { + pBlock = prefetchedDataBlock; + prefetchedDataBlock = NULL; + } + if (pBlock == NULL) { break; } - if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock, false); - + if (!hasGroupId) { // calculate the buffer pages according to the total available buffers. int32_t rowSize = blockDataGetRowSize(pBlock); if (rowSize * 4 > 4096) { @@ -547,29 +555,36 @@ static int32_t createInitialSources(SSortHandle* pHandle) { // todo!! pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; - } - // perform the scalar function calculation before apply the sort - if (pHandle->beforeFp != NULL) { - pHandle->beforeFp(pBlock, pHandle->param); + hasGroupId = true; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } - // todo relocate the columns - int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); - if (code != 0) { - return code; - } + if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) { + // perform the scalar function calculation before apply the sort + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != 0) { + return code; + } - size_t size = blockDataGetSize(pHandle->pDataBlock); - if (size > sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; - doAddToBuf(pHandle->pDataBlock, pHandle); + doAddToBuf(pHandle->pDataBlock, pHandle); + } + } else { + prefetchedDataBlock = pBlock; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } } -- GitLab