提交 1d24b96e 编写于 作者: S slzhou

enhance: skip data blocks with the merge limit ts when create initial source

上级 321be1f8
......@@ -51,6 +51,7 @@ struct SSortHandle {
uint32_t tmpRowIdx;
int64_t mergeLimit;
int64_t currMergeLimitTs;
int32_t sourceId;
SSDataBlock* pDataBlock;
......@@ -921,7 +922,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
......@@ -929,14 +931,21 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
}
......@@ -955,8 +964,17 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
}
blockDataCleanup(pHandle->pDataBlock);
}
......@@ -982,11 +1000,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
if (pOrder->order == TSDB_ORDER_ASC) {
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
}
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
continue;
}
}
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册