提交 eb6250de 编写于 作者: S shenglian zhou

feat: support multiway sort merge

上级 37c822d3
...@@ -1213,7 +1213,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { ...@@ -1213,7 +1213,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.numOfCols = numOfCols; pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rows; pBlock->info.rowSize = pDataBlock->info.rowSize;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
......
...@@ -785,7 +785,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -785,7 +785,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
......
...@@ -4660,8 +4660,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4660,8 +4660,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColList = SArray* pColList =
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......
...@@ -230,6 +230,7 @@ typedef struct SMultiwaySortMergeOperatorInfo { ...@@ -230,6 +230,7 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output SArray* pColMatchInfo; // for index map from table scan output
SSDataBlock* pInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
} SMultiwaySortMergeOperatorInfo; } SMultiwaySortMergeOperatorInfo;
...@@ -246,14 +247,14 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { ...@@ -246,14 +247,14 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str); pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource ps = {0}; SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps.param = pOperator->pDownstream[i]; ps->param = pOperator->pDownstream[i];
tsortAddSource(pInfo->pSortHandle, &ps); tsortAddSource(pInfo->pSortHandle, ps);
} }
int32_t code = tsortOpen(pInfo->pSortHandle); int32_t code = tsortOpen(pInfo->pSortHandle);
...@@ -296,6 +297,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { ...@@ -296,6 +297,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
...@@ -313,7 +315,7 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx ...@@ -313,7 +315,7 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
...@@ -330,6 +332,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, ...@@ -330,6 +332,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge"; pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = true; pOperator->blocking = true;
......
...@@ -225,6 +225,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -225,6 +225,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param); pSource->src.pBlock = pHandle->fetchfp(pSource->param);
if (pSource->src.pBlock == NULL) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
}
} }
} }
...@@ -361,13 +365,21 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -361,13 +365,21 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); if (pLeftBlock->pBlockAgg == NULL) {
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
} else {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
}
} }
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]); if (pLeftBlock->pBlockAgg == NULL) {
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
} else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[i]);
}
} }
if (leftNull && rightNull) { if (leftNull && rightNull) {
...@@ -408,7 +420,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -408,7 +420,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64, qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed);
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
......
...@@ -166,8 +166,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -166,8 +166,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
} }
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
} }
// case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(streamQuery, pNode); return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册