diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index a11ea0b2cdaa4867e5d5b09968d3df643c1ba87d..e382fa4efd4eea7f22cf41da92ec751bd7a2fa4a 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -111,6 +111,7 @@ typedef struct SAggLogicNode { SNodeList* pGroupKeys; SNodeList* pAggFuncs; bool hasLastRow; + bool hasTimeLineFunc; } SAggLogicNode; typedef struct SProjectLogicNode { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 44a78f2a0bb7297e2a11d9bc0445ef09d55ed923..9ebcd82ff961a4e76ba77d1ec6ee699d1ec864da 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -521,7 +521,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt // NOTE: the last parameter is the primary timestamp column // todo: refactor this - if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { + if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } @@ -4440,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ - .waterMark = pTableScanNode->watermark, - .calTrigger = pTableScanNode->triggerType, - .maxTs = INT64_MIN, + STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, }; if (pHandle->vnode) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 567941c1578032e1e43873966787c8819bb96cbc..324a17320ee5baeff0e786589c3be8574376ab7c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2242,7 +2242,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2253,7 +2253,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last_row", .type = FUNCTION_TYPE_CACHE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2263,7 +2263,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2277,7 +2277,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_partial", .type = FUNCTION_TYPE_FIRST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLastPartial, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2288,7 +2288,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_merge", .type = FUNCTION_TYPE_FIRST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2299,7 +2299,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2313,7 +2313,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLastPartial, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2324,7 +2324,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_merge", .type = FUNCTION_TYPE_LAST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e198df4bc439d64933a73102f5d1087eb0e3bec7..5a6bea6024cabf272bcdd650957c380334e6d1b3 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1237,7 +1237,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType); pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); - pSelect->hasTimeLineFunc = pSelect->hasLastRowFunc ? true : fmIsTimelineFunc(pFunc->funcId); + pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); } } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 366f97071023cf15adec4d794acd12695fc1e6ba..de9f16f1717e90739c80d4ae07b63271a0b45ab7 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -478,8 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } pAgg->hasLastRow = pSelect->hasLastRowFunc; + pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc; pAgg->node.groupAction = GROUP_ACTION_SET; - pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE; pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; @@ -928,7 +929,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe int32_t code = TSDB_CODE_SUCCESS; // set grouyp keys, agg funcs and having conditions SNodeList* pGroupKeys = NULL; - SNode* pProjection = NULL; + SNode* pProjection = NULL; FOREACH(pProjection, pSelect->pProjectionList) { code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pProjection)); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index a7ccb72792f35832aedf0dff1d653dd515c0aaab..6a99c0b8dbd17735a28eb474b74ec7408b6832b5 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -904,14 +904,6 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit return code; } -static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode; - if (NULL != pScan->pGroupTags) { - return stbSplSplitScanNodeWithPartTags(pCxt, pInfo); - } - return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo); -} - static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { SNode* pCol = NULL; FOREACH(pCol, pScan->pScanCols) { @@ -922,7 +914,7 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { return NULL; } -static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) { +static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) { SNodeList* pMergeKeys = NULL; int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { @@ -937,12 +929,24 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS return code; } +static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode; + if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) { + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan); + } + if (NULL != pScan->pGroupTags) { + return stbSplSplitScanNodeWithPartTags(pCxt, pInfo); + } + return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo); +} + static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) { int32_t code = TSDB_CODE_SUCCESS; SNode* pChild = NULL; FOREACH(pChild, pJoin->node.pChildren) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) { - code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild); + code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild); } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild); } else { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index d4bd7e51622c7c021c5102d2babe1bb50b412dc3..bfa6079cb15f92588b6f9490fa8ebb63809cdfb7 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -124,7 +124,7 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* } static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { - if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) { + if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) { return TSDB_CODE_SUCCESS; } // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK @@ -161,7 +161,9 @@ static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel req return TSDB_CODE_PLAN_INTERNAL_ERROR; } pAgg->node.resultDataOrder = requirement; - pAgg->node.requireDataOrder = requirement; + if (pAgg->hasTimeLineFunc) { + pAgg->node.requireDataOrder = requirement < DATA_ORDER_LEVEL_IN_GROUP ? DATA_ORDER_LEVEL_IN_GROUP : requirement; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 2fe731a553ba56169f5c3b81d81a8afc648bf01c..8f9cd94c19eb29c20898a10a1167803a74384b2e 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -139,6 +139,10 @@ TEST_F(PlanBasicTest, timeLineFunc) { run("SELECT CSUM(c1) FROM t1"); run("SELECT CSUM(c1) FROM st1"); + + run("SELECT TWA(c1) FROM t1"); + + run("SELECT TWA(c1) FROM st1"); } TEST_F(PlanBasicTest, multiResFunc) {