未验证 提交 66042669 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #22176 from taosdata/szhou/enhance-limit-proc

fix: choose heap sort for table merge scan with long row and limit
......@@ -2837,15 +2837,23 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
int64_t mergeLimit = -1;
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
if (hasLimit) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
if (hasLimit) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
} else {
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block
......
......@@ -54,19 +54,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit;
}
int32_t numOfOutputCols = 0;
int32_t code =
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit + pLimit->offset;
}
pOperator->exprSupp.pCtx =
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
......
......@@ -1405,6 +1405,9 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
}
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
if (pHandle->pDataBlock == NULL) { // when no input stream datablock
return NULL;
}
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abandon the top tuple if queue size bigger than max size
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册