diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 88a6185495c69e8afd13c8c93930d4d992d86f8e..2bca846ef5c49186a18414dd60eabcfdf7b5ff05 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -227,7 +227,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 9fdc66d76c3f19abab33f52b1745b17d5b7b9d73..b56ea965cf317ff4aa226b5b9a624fc55daa5704 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -377,7 +377,7 @@ typedef struct SIntervalPhysiNode { int8_t slidingUnit; } SIntervalPhysiNode; -typedef SIntervalPhysiNode SMergeIntervalPhysiNode; +typedef SIntervalPhysiNode SMergeAlignedIntervalPhysiNode; typedef SIntervalPhysiNode SStreamIntervalPhysiNode; typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode; typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index ae0f669d656a458a310339e461ad2e55918ee211..2d55b06f064ca0023c551f8718a44b69d1996f8a 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -184,8 +184,8 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo pPhysiChildren = indefPhysiNode->node.pChildren; break; } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: { - SMergeIntervalPhysiNode *intPhysiNode = (SMergeIntervalPhysiNode *)pNode; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: { + SMergeAlignedIntervalPhysiNode *intPhysiNode = (SMergeAlignedIntervalPhysiNode *)pNode; pPhysiChildren = intPhysiNode->window.node.pChildren; break; } @@ -841,8 +841,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: { - SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: { + SMergeAlignedIntervalPhysiNode *pIntNode = (SMergeAlignedIntervalPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk)); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5761c0be78189d96cc30d59063b6a6600c9b5923..9555e477ea9a4fea6c61f05db7f94c45b221bb1a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -729,7 +729,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream); -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 20dac8522330c469cfa5267ca6ff0381f463fc79..6591d97302a2b2d9e453fb6b1a214498175195e8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4190,8 +4190,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { - SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { + SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -4204,7 +4204,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); + pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0fabee5a1ecc12b2fddf988d5a922748333d66d6..97be4645a8030741644f9c8fc84cc270a0ecad9e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -681,4 +681,4 @@ _error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); return NULL; -} \ No newline at end of file +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5495b5e9bf4483588e08bf62eca769991302cad7..ad133ce103de1bf59d2e6dad8ab56f2dbf3da8c1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3667,23 +3667,23 @@ _error: return NULL; } -typedef struct SMergeIntervalAggOperatorInfo { +typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo intervalAggOperatorInfo; bool hasGroupId; uint64_t groupId; SSDataBlock* prefetchedBlock; bool inputBlocksFinished; -} SMergeIntervalAggOperatorInfo; +} SMergeAlignedIntervalAggOperatorInfo; -void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { - SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; +void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); } -static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, +static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) { - SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -3702,9 +3702,9 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t return 0; } -static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, +static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) { - SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -3722,7 +3722,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; - // TODO: remove the hash table usage (groupid + winkey => result row position) + // TODO: remove the hash table (groupid + winkey => result row position) int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -3744,13 +3744,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); currTs = tsCols[currPos]; currWin.skey = currTs; - currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - - 1; + currWin.ekey = taosTimeAdd(currWin.skey, + iaInfo->interval.interval, + iaInfo->interval.intervalUnit, + iaInfo->interval.precision) - 1; startPos = currPos; ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); @@ -3763,13 +3764,13 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); } -static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { +static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -3790,6 +3791,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { } else { pBlock = miaInfo->prefetchedBlock; miaInfo->groupId = pBlock->info.groupId; + miaInfo->prefetchedBlock = NULL; } if (pBlock == NULL) { @@ -3807,7 +3809,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); - doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; @@ -3826,10 +3828,10 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pRes; } -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) { - SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); + SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (miaInfo == NULL || pOperator == NULL) { goto _error; @@ -3864,8 +3866,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI initResultRowInfo(&iaInfo->binfo.resultRowInfo); - pOperator->name = "TimeMergeIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + pOperator->name = "TimeMergeAlignedIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->exprSupp.pExprInfo = pExprInfo; @@ -3873,8 +3875,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = miaInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, - destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeAlignedIntervalAgg, NULL, NULL, + destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -3884,7 +3886,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 8290b5628c0a09cff094232546dfdafdf5ce6af4..0bde0d0581324d5a098fb2dc8b8f9906c5944920 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -702,7 +702,7 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return physiSysTableScanCopy((const SSystemTableScanPhysiNode*)pNode, (SSystemTableScanPhysiNode*)pDst); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index e3430b866b18dcbf582f2f37e309ec821158aecd..10a933b00b074ae266ee632277a1799ad6c2632d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -236,8 +236,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiSort"; case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return "PhysiHashInterval"; - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: - return "PhysiMergeInterval"; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + return "PhysiMergeAlignedInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return "PhysiStreamInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: @@ -4128,7 +4128,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return physiSortNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: @@ -4269,7 +4269,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return jsonToPhysiSortNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index a60bf13f7d8513d8df246a9eb7509d6b6b6f4169..dff3821a0a070b484c31fefba4d250087a501afe 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -289,8 +289,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSortPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return makeNode(type, sizeof(SIntervalPhysiNode)); - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: - return makeNode(type, sizeof(SMergeIntervalPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + return makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return makeNode(type, sizeof(SStreamIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: @@ -828,7 +828,7 @@ void nodesDestroyNode(SNode* pNode) { break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 67ffdfb7d5e602f0dc89de735c25b5a76f66f5bb..37765edfc57a54f8d4e9ac02aff93d2eba3eb610 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1049,7 +1049,7 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) { case INTERVAL_ALGO_HASH: return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; case INTERVAL_ALGO_MERGE: - return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; case INTERVAL_ALGO_STREAM_FINAL: return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; case INTERVAL_ALGO_STREAM_SEMI: