From eb6250de4f85a0604f98111e40e0c93dbcb69df2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 7 Jun 2022 17:41:11 +0800 Subject: [PATCH] feat: support multiway sort merge --- source/common/src/tdatablock.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 5 +++-- source/libs/executor/src/sortoperator.c | 13 ++++++++----- source/libs/executor/src/tsort.c | 18 +++++++++++++++--- source/libs/planner/src/planSpliter.c | 4 ++-- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d9293433ea..fbe246434c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1213,7 +1213,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - pBlock->info.rowSize = pDataBlock->info.rows; + pBlock->info.rowSize = pDataBlock->info.rowSize; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1e0739e066..78ab34b304 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -785,7 +785,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f08c2f95ba..c4b72be465 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4660,8 +4660,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfOutputCols = 0; SArray* pColList = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); - - pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index c84d4491af..b5195e65b8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -230,6 +230,7 @@ typedef struct SMultiwaySortMergeOperatorInfo { SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output + SSDataBlock* pInputBlock; int64_t startTs; // sort start time } SMultiwaySortMergeOperatorInfo; @@ -246,14 +247,14 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, - pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str); + pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SSortSource ps = {0}; - ps.param = pOperator->pDownstream[i]; - tsortAddSource(pInfo->pSortHandle, &ps); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pOperator->pDownstream[i]; + tsortAddSource(pInfo->pSortHandle, ps); } int32_t code = tsortOpen(pInfo->pSortHandle); @@ -296,6 +297,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pColMatchInfo); @@ -313,7 +315,7 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); @@ -330,6 +332,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; + pInfo->pInputBlock = pInputBlock; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = true; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7581836d59..3705d0a57b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -225,6 +225,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); + if (pSource->src.pBlock == NULL) { + pSource->src.rowIndex = -1; + ++pHandle->numOfCompletedSources; + } } } @@ -361,13 +365,21 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { bool leftNull = false; if (pLeftColInfoData->hasNull) { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); + if (pLeftBlock->pBlockAgg == NULL) { + leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); + } else { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]); + } } SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]); + if (pLeftBlock->pBlockAgg == NULL) { + rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); + } else { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[i]); + } } if (leftNull && rightNull) { @@ -408,7 +420,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64, - pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); + pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed); int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 7be33d54e3..5c8b41f87a 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -166,8 +166,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { } return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } - // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(streamQuery, pNode); + case QUERY_NODE_LOGIC_PLAN_SORT: + return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: -- GitLab