diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c90c654f430d1c74097f577e37018b8bee94c8e..ab8991bd05477e56751fc5aac6787fa5875dd3d5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2840,7 +2840,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->sortBufSize = 2048 * pInfo->bufPageSize; + pInfo->sortBufSize = 1024 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index f274e9717b860696cad30e138c06e7a1b201c2c8..34f9dfc23373dc707484e22e84edec37b60e7027 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -475,11 +475,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pHandle->type == SORT_SINGLESOURCE_SORT) { pSource->pageIndex++; if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { + qInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); (*numOfCompleted) += 1; pSource->src.rowIndex = -1; pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); - uInfo("adjust merge tree. %d source completed", *numOfCompleted); } else { int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -495,8 +495,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } releaseBufPage(pHandle->pBuf, pPage); - if (pSource->pageIndex % 256 == 0) - uInfo("got block from page %d from ext mem source %p", pSource->pageIndex, pSource); } } else { int64_t st = taosGetTimestampUs(); @@ -506,7 +504,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; - uInfo("adjust merge tree. %d source completed", *numOfCompleted); + qInfo("adjust merge tree. %d source completed", *numOfCompleted); } } } @@ -688,7 +686,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { // Only *numOfInputSources* can be loaded into buffer to perform the external sort. for (int32_t i = 0; i < sortGroup; ++i) { - uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); + qInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); pHandle->sourceId += 1; int32_t end = (i + 1) * numOfInputSources - 1; @@ -884,11 +882,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); ++nRows; if (pHandle->pDataBlock->info.rows >= rowCap) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); - } + } + if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; ++numEnded; @@ -911,39 +911,101 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } -static int32_t createInitialSources(SSortHandle* pHandle) { - size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - int32_t code = 0; +static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - if (pHandle->type == SORT_SINGLESOURCE_SORT) { - SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - SSortSource* source = *pSource; - *pSource = NULL; + size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; + createPageBuf(pHandle); - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); + int32_t szSort = 0; - while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(source->param); - if (pBlock == NULL) { - break; - } + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + while (1) { + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - if (pHandle->pDataBlock == NULL) { - uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols); + if (pBlk != NULL) { + szSort += blockDataGetSize(pBlk); + SSDataBlock* blk = createOneDataBlock(pBlk, true); + taosArrayPush(aBlkSort, &blk); + } + if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { + int64_t p = taosGetTimestampUs(); + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; - // todo, number of pages are set according to the total available sort buffer - pHandle->numOfPages = 1024; - sortBufSize = pHandle->numOfPages * pHandle->pageSize; - pHandle->pDataBlock = createOneDataBlock(pBlock, false); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); } + taosArrayClear(aBlkSort); + szSort = 0; + } + if (pBlk == NULL) { + break; + }; + } + + taosArrayDestroy(aBlkSort); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + taosArrayDestroy(aExtSrc); + + pHandle->type = SORT_SINGLESOURCE_SORT; + return 0; +} - if (pHandle->beforeFp != NULL) { - pHandle->beforeFp(pBlock, pHandle->param); +static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { + int32_t code = 0; + size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; + + SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + SSortSource* source = *pSource; + *pSource = NULL; + + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + + while (1) { + SSDataBlock* pBlock = pHandle->fetchfp(source->param); + if (pBlock == NULL) { + break; + } + + if (pHandle->pDataBlock == NULL) { + uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols); + + // todo, number of pages are set according to the total available sort buffer + pHandle->numOfPages = 1024; + sortBufSize = pHandle->numOfPages * pHandle->pageSize; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); + } + + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + + code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); } + if (!source->onlyRef && source->src.pBlock) { + blockDataDestroy(source->src.pBlock); + source->src.pBlock = NULL; + } + taosMemoryFree(source); + return code; + } - code = blockDataMerge(pHandle->pDataBlock, pBlock); - if (code != TSDB_CODE_SUCCESS) { + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + if (code != 0) { if (source->param && !source->onlyRef) { taosMemoryFree(source->param); } @@ -951,122 +1013,71 @@ static int32_t createInitialSources(SSortHandle* pHandle) { blockDataDestroy(source->src.pBlock); source->src.pBlock = NULL; } + taosMemoryFree(source); return code; } - size_t size = blockDataGetSize(pHandle->pDataBlock); - if (size > sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - if (code != 0) { - if (source->param && !source->onlyRef) { - taosMemoryFree(source->param); - } - if (!source->onlyRef && source->src.pBlock) { - blockDataDestroy(source->src.pBlock); - source->src.pBlock = NULL; - } - - taosMemoryFree(source); - return code; - } - - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); - code = doAddToBuf(pHandle->pDataBlock, pHandle); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + code = doAddToBuf(pHandle->pDataBlock, pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; } } + } - if (source->param && !source->onlyRef) { - taosMemoryFree(source->param); - } + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); + } - taosMemoryFree(source); + taosMemoryFree(source); - if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { - size_t size = blockDataGetSize(pHandle->pDataBlock); + if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { + size_t size = blockDataGetSize(pHandle->pDataBlock); - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); - code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - if (code != 0) { - return code; - } + code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + if (code != 0) { + return code; + } - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; - // All sorted data can fit in memory, external memory sort is not needed. Return to directly - if (size <= sortBufSize && pHandle->pBuf == NULL) { - pHandle->cmpParam.numOfSources = 1; - pHandle->inMemSort = true; + // All sorted data can fit in memory, external memory sort is not needed. Return to directly + if (size <= sortBufSize && pHandle->pBuf == NULL) { + pHandle->cmpParam.numOfSources = 1; + pHandle->inMemSort = true; - pHandle->loops = 1; - pHandle->tupleHandle.rowIndex = -1; - pHandle->tupleHandle.pBlock = pHandle->pDataBlock; - return 0; - } else { - code = doAddToBuf(pHandle->pDataBlock, pHandle); - } + pHandle->loops = 1; + pHandle->tupleHandle.rowIndex = -1; + pHandle->tupleHandle.pBlock = pHandle->pDataBlock; + return 0; + } else { + code = doAddToBuf(pHandle->pDataBlock, pHandle); } - } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { - SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - - size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; - createPageBuf(pHandle); - - SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - int32_t szSort = 0; - - SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); - while (1) { - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - if (pBlk == NULL) { - break; - }; - - szSort += blockDataGetSize(pBlk); - SSDataBlock* blk = createOneDataBlock(pBlk, true); - taosArrayPush(aBlkSort, &blk); - - if (szSort > maxBufSize) { - sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); - uInfo("initial source %zu created for %zu blocks", taosArrayGetSize(aExtSrc), taosArrayGetSize(aBlkSort)); + } + return code; +} - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); - szSort = 0; - } - } - if (szSort > 0) { - sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); - szSort = 0; - } - taosArrayDestroy(aBlkSort); - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); - taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); - taosArrayDestroy(aExtSrc); +static int32_t createInitialSources(SSortHandle* pHandle) { + int32_t code = 0; - pHandle->type = SORT_SINGLESOURCE_SORT; - uInfo("create initial sources for table merge scan ended"); + if (pHandle->type == SORT_SINGLESOURCE_SORT) { + code = createBlocksQuickSortInitialSources(pHandle); + } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + code = createBlocksMergeSortInitialSources(pHandle); + } + qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); + for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) { + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); + qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); } - return code; }