未验证 提交 615c5d2b 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #14287 from taosdata/szhou/feature/multiwaymerge

feat: multiway merge handles group+mergekeys, group and only merge keys
......@@ -730,10 +730,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......
......@@ -159,7 +159,12 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
}
SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
size_t numOfCols = 0;
if (pNodeList != NULL) {
numOfCols = LIST_LENGTH(pNodeList);
} else {
numOfCols = 0;
}
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -4269,17 +4269,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0;
SArray* pColList =
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo);
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......
......@@ -142,7 +142,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SSDataBlock *pBlock = pOperator->fpSet.getNextFn(pOperator);
SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
}
......@@ -443,7 +443,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error;
}
......@@ -475,8 +475,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, NULL,
NULL, getGroupSortExplainExecInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
NULL, NULL, getGroupSortExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -494,7 +494,7 @@ _error:
//=====================================================================================
// Multiway Sort Merge operator
typedef struct SMultiwaySortMergeOperatorInfo {
typedef struct SMultiwayMergeOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufPageSize;
......@@ -504,16 +504,17 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
SSDataBlock* pInputBlock;
int64_t startTs; // sort start time
bool hasGroupId;
uint64_t groupId;
STupleHandle *prefetchedTuple;
} SMultiwaySortMergeOperatorInfo;
SSDataBlock* pInputBlock;
int64_t startTs; // sort start time
bool groupSort;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SMultiwayMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
......@@ -527,8 +528,9 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, true);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
......@@ -550,8 +552,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
......@@ -564,35 +566,46 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->prefetchedTuple == NULL) {
if (pInfo->groupSort) {
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
}
else {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
pInfo->groupId = 0;
}
if (pTupleHandle == NULL) {
break;
}
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
if (pInfo->groupSort)
{
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
appendOneRowToDataBlock(p, pTupleHandle);
}
if (p->info.rows >= capacity) {
break;
}
}
if (p->info.rows > 0) {// todo extract method
if (p->info.rows > 0) { // todo extract method
blockDataEnsureCapacity(pDataBlock, p->info.rows);
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -614,13 +627,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
......@@ -637,8 +650,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
return pBlock;
}
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param;
void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
......@@ -646,11 +659,11 @@ void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pColMatchInfo);
}
int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info;
SMultiwayMergeOperatorInfo* pOperatorInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
*pOptrExplain = pInfo;
......@@ -658,24 +671,35 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo,
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize;
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams,
SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) {
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t rowSize = pResBlock->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
goto _error;
}
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo =
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(pOperator, 1024);
pInfo->groupSort = pMergePhyNode->groupSort;
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge";
pOperator->name = "MultiwayMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
......@@ -687,9 +711,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
int32_t code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册