提交 d1a1976a 编写于 作者: S slzhou

enhance: add limit to merge sort

上级 b9aeda26
......@@ -26,7 +26,7 @@ extern "C" {
enum {
SORT_MULTISOURCE_MERGE = 0x1,
SORT_SINGLESOURCE_SORT = 0x2,
SORT_TABLE_MERGE_SCAN = 0x3
SORT_BLOCK_TS_MERGE = 0x3
};
typedef struct SMultiMergeSource {
......@@ -56,7 +56,7 @@ typedef struct SMsortComparParam {
bool cmpGroupId;
int32_t sortType;
// the following field to speed up when sortType == SORT_TABLE_MERGE_SCAN
// the following field to speed up when sortType == SORT_BLOCK_TS_MERGE
int32_t tsSlotId;
int32_t order;
__compar_fn_t cmpFn;
......@@ -77,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t pqSortBufSize);
void tsortSetForceUsePQSort(SSortHandle* pHandle);
......@@ -117,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
*/
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
*/
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit);
/**
*
*/
......
......@@ -2842,9 +2842,13 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortBufSize = 1024 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
int64_t mergeLimit = -1;
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block
......
......@@ -42,13 +42,15 @@ struct SSortHandle {
int64_t startTs;
uint64_t totalElapsed;
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
uint64_t pqMaxRows;
uint32_t pqMaxTupleLength;
uint32_t pqSortBufSize;
bool forceUsePQSort;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int64_t mergeLimit;
int32_t sourceId;
SSDataBlock* pDataBlock;
SMsortComparParam cmpParam;
......@@ -173,8 +175,8 @@ void destroyTuple(void* t) {
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize) {
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t pqSortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
......@@ -183,10 +185,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows != 0) {
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
if (pqMaxRows != 0) {
pSortHandle->pqSortBufSize = pqSortBufSize;
pSortHandle->pqMaxRows = pqMaxRows;
}
pSortHandle->forceUsePQSort = false;
......@@ -194,11 +196,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
pSortHandle->mergeLimit = -1;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type;
if (type == SORT_TABLE_MERGE_SCAN) {
if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pOrder->slotId;
pSortHandle->cmpParam.order = pOrder->order;
......@@ -586,7 +590,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
}
}
if (pParam->sortType == SORT_TABLE_MERGE_SCAN) {
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex;
......@@ -709,18 +713,23 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
taosArrayDestroy(pResList);
return code;
}
int nRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) {
if (tsortIsClosed(pHandle)) {
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
}
if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) {
break;
}
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) {
break;
}
nRows += pDataBlock->info.rows;
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
......@@ -740,7 +749,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataCleanup(pDataBlock);
}
......@@ -846,7 +854,6 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataCleanup(blk);
return 0;
}
......@@ -913,6 +920,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
size_t blkPgSz = pgHeaderSz;
while (nRows < totalRows) {
if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) {
break;
}
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
......@@ -920,6 +930,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
}
......@@ -939,6 +950,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
if (pHandle->pDataBlock->info.rows > 0) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
blockDataCleanup(pHandle->pDataBlock);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
......@@ -1061,7 +1073,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -1086,7 +1098,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
return code;
}
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
......@@ -1111,7 +1123,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
code = createBlocksQuickSortInitialSources(pHandle);
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
} else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
code = createBlocksMergeSortInitialSources(pHandle);
}
uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
......@@ -1165,6 +1177,10 @@ void tsortSetClosed(SSortHandle* pHandle) {
atomic_store_8(&pHandle->closed, 2);
}
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) {
pHandle->mergeLimit = mergeLimit;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
void* param) {
pHandle->fetchfp = fetchFp;
......@@ -1244,8 +1260,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) {
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
if (tsortIsForceUsePQSort(pHandle)) return true;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->pqMaxRows;
}
static bool tsortPQCompFn(void* a, void* b, void* param) {
......@@ -1291,7 +1307,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
}
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, tupleComparFn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册