提交 97a6e89d 编写于 作者: S slzhou

enhance: refactor create initial sources

上级 ba2b4042
......@@ -2840,7 +2840,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
pInfo->sortBufSize = 1024 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
......
......@@ -475,11 +475,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
pSource->pageIndex++;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
qInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
uInfo("adjust merge tree. %d source completed", *numOfCompleted);
} else {
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
......@@ -495,8 +495,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
}
releaseBufPage(pHandle->pBuf, pPage);
if (pSource->pageIndex % 256 == 0)
uInfo("got block from page %d from ext mem source %p", pSource->pageIndex, pSource);
}
} else {
int64_t st = taosGetTimestampUs();
......@@ -506,7 +504,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
uInfo("adjust merge tree. %d source completed", *numOfCompleted);
qInfo("adjust merge tree. %d source completed", *numOfCompleted);
}
}
}
......@@ -688,7 +686,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for (int32_t i = 0; i < sortGroup; ++i) {
uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
qInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
......@@ -884,11 +882,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
++nRows;
if (pHandle->pDataBlock->info.rows >= rowCap) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
}
}
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
sup.aRowIdx[minIdx] = -1;
++numEnded;
......@@ -911,39 +911,101 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
return 0;
}
static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
int32_t code = 0;
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
*pSource = NULL;
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
createPageBuf(pHandle);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) {
break;
}
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pHandle->pDataBlock == NULL) {
uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
SSDataBlock* blk = createOneDataBlock(pBlk, true);
taosArrayPush(aBlkSort, &blk);
}
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
int64_t p = taosGetTimestampUs();
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
// todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
}
if (pBlk == NULL) {
break;
};
}
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;
return 0;
}
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
*pSource = NULL;
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) {
break;
}
if (pHandle->pDataBlock == NULL) {
uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
// todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
return code;
}
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
......@@ -951,122 +1013,71 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
return code;
}
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
return code;
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
taosMemoryFree(source);
taosMemoryFree(source);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
return code;
}
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
return code;
}
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize && pHandle->pBuf == NULL) {
pHandle->cmpParam.numOfSources = 1;
pHandle->inMemSort = true;
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize && pHandle->pBuf == NULL) {
pHandle->cmpParam.numOfSources = 1;
pHandle->inMemSort = true;
pHandle->loops = 1;
pHandle->tupleHandle.rowIndex = -1;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0;
} else {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
pHandle->loops = 1;
pHandle->tupleHandle.rowIndex = -1;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0;
} else {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
createPageBuf(pHandle);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk == NULL) {
break;
};
szSort += blockDataGetSize(pBlk);
SSDataBlock* blk = createOneDataBlock(pBlk, true);
taosArrayPush(aBlkSort, &blk);
if (szSort > maxBufSize) {
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
uInfo("initial source %zu created for %zu blocks", taosArrayGetSize(aExtSrc), taosArrayGetSize(aBlkSort));
}
return code;
}
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
}
}
if (szSort > 0) {
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
}
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc);
static int32_t createInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
pHandle->type = SORT_SINGLESOURCE_SORT;
uInfo("create initial sources for table merge scan ended");
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
code = createBlocksQuickSortInitialSources(pHandle);
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
code = createBlocksMergeSortInitialSources(pHandle);
}
qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) {
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i);
qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList));
}
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册