diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a139b511a11a4551ef74fedb2e05b9eae22ca2f8..1cc3f9b874cafa35fdf887b266f203bf484d352c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -730,10 +730,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, - SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 374a3a736da1c210ae87ce5efb261ba45743d29f..be6ca5145ae54cd3be036b8de785f95ff7170d9b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -159,7 +159,12 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { } SArray* createSortInfo(SNodeList* pNodeList) { - size_t numOfCols = LIST_LENGTH(pNodeList); + size_t numOfCols = 0; + if (pNodeList != NULL) { + numOfCols = LIST_LENGTH(pNodeList); + } else { + numOfCols = 0; + } SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); if (pList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0978bf3f8b147de98741af26affc01c55ed4e5cd..17e5482b6035ff7b7c9921221735e7b33f45728c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4269,17 +4269,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; - - SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - - SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys); - int32_t numOfOutputCols = 0; - SArray* pColList = - extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); - pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo); + pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 96549ab6aecacae8338ada0fe5bd137ae648e145..44ff4c1c9099c70f40601b2e2de847ff0becd103 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -142,7 +142,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - SSDataBlock *pBlock = pOperator->fpSet.getNextFn(pOperator); + SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator); return pBlock; } @@ -443,7 +443,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } @@ -475,8 +475,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, NULL, - NULL, getGroupSortExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, + NULL, NULL, getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -494,7 +494,7 @@ _error: //===================================================================================== // Multiway Sort Merge operator -typedef struct SMultiwaySortMergeOperatorInfo { +typedef struct SMultiwayMergeOperatorInfo { SOptrBasicInfo binfo; int32_t bufPageSize; @@ -504,16 +504,17 @@ typedef struct SMultiwaySortMergeOperatorInfo { SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output - SSDataBlock* pInputBlock; - int64_t startTs; // sort start time - bool hasGroupId; - uint64_t groupId; - STupleHandle *prefetchedTuple; -} SMultiwaySortMergeOperatorInfo; + SSDataBlock* pInputBlock; + int64_t startTs; // sort start time + bool groupSort; + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; +} SMultiwayMergeOperatorInfo; -int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { - SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; +int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -527,8 +528,9 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - tsortSetCompareGroupId(pInfo->pSortHandle, true); - + + tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort); + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[i]; @@ -550,8 +552,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SOperatorInfo* pOperator) { - SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); @@ -564,35 +566,46 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData while (1) { STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { + if (pInfo->groupSort) { + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + } + else { pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); - pInfo->prefetchedTuple = NULL; + pInfo->groupId = 0; } if (pTupleHandle == NULL) { break; } - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { - pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); + if (pInfo->groupSort) + { + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } } else { - pInfo->prefetchedTuple = pTupleHandle; - break; + appendOneRowToDataBlock(p, pTupleHandle); } if (p->info.rows >= capacity) { break; } } - if (p->info.rows > 0) {// todo extract method + if (p->info.rows > 0) { // todo extract method blockDataEnsureCapacity(pDataBlock, p->info.rows); int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { @@ -614,13 +627,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } -SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { +SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { @@ -637,8 +650,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { return pBlock; } -void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { - SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param; +void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { + SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); @@ -646,11 +659,11 @@ void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pColMatchInfo); } -int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { +int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info; + SMultiwayMergeOperatorInfo* pOperatorInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); *pOptrExplain = pInfo; @@ -658,24 +671,35 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, - SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, - SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { - SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t rowSize = pResBlock->info.rowSize; +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, + SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { + SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; + + SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + + int32_t rowSize = pResBlock->info.rowSize; if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { goto _error; } + SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); + int32_t numOfOutputCols = 0; + SArray* pColMatchColInfo = + extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(pOperator, 1024); + pInfo->groupSort = pMergePhyNode->groupSort; pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; - pOperator->name = "MultiwaySortMerge"; + pOperator->name = "MultiwayMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; @@ -687,9 +711,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, // one additional is reserved for merged result. pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); - pOperator->fpSet = - createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, - destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, + destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo); int32_t code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) {