提交 c438b699 编写于 作者: S shenglian zhou

feat: support multiway sort merge

上级 a8be2042
......@@ -774,7 +774,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......
......@@ -4524,8 +4524,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0;
SArray* pColList = NULL;
//extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pTaskInfo);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......
......@@ -224,30 +224,11 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int64_t startTs; // sort start time
} SMultiwaySortMergeOperatorInfo;
SSDataBlock* getMultiwaySortMergedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
if (pDataBlock->info.rows >= capacity) {
return pDataBlock;
}
}
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo * pInfo = pOperator->info;
......@@ -261,7 +242,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE,
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
......@@ -298,7 +279,10 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getMultiwaySortMergedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle,
pInfo->binfo.pRes,
pOperator->resultInfo.capacity,
pInfo->pColMatchInfo);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
......@@ -328,7 +312,8 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
}
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo) {
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize;
......@@ -342,6 +327,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo= pColMatchColInfo;
pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册