diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 46aa11f562a3e6264fa0a2b48ff98648ccaf1418..a11ea0b2cdaa4867e5d5b09968d3df643c1ba87d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -33,6 +33,13 @@ typedef enum EDataOrderLevel { DATA_ORDER_LEVEL_GLOBAL } EDataOrderLevel; +typedef enum EGroupAction { + GROUP_ACTION_NONE = 1, + GROUP_ACTION_SET, + GROUP_ACTION_KEEP, + GROUP_ACTION_CLEAR +} EGroupAction; + typedef struct SLogicNode { ENodeType type; SNodeList* pTargets; // SColumnNode @@ -45,6 +52,7 @@ typedef struct SLogicNode { SNode* pSlimit; EDataOrderLevel requireDataOrder; // requirements for input data EDataOrderLevel resultDataOrder; // properties of the output data + EGroupAction groupAction; } SLogicNode; typedef enum EScanType { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index e7109b5a87aedd9024eb99376befe20e241e9c13..121e69763040c42c9fc06f919a0ee59964dc5cd5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -332,6 +332,9 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { COPY_SCALAR_FIELD(precision); CLONE_NODE_FIELD(pLimit); CLONE_NODE_FIELD(pSlimit); + COPY_SCALAR_FIELD(requireDataOrder); + COPY_SCALAR_FIELD(resultDataOrder); + COPY_SCALAR_FIELD(groupAction); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 7d6015d5c4e51951e046584d53759f3f6720ad7e..9b075782123d4fecbad9f48f1e5237a6870033fd 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -250,6 +250,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect SScanLogicNode* pScan = NULL; int32_t code = makeScanLogicNode(pCxt, pRealTable, pSelect->hasRepeatScanFuncs, (SLogicNode**)&pScan); + pScan->node.groupAction = GROUP_ACTION_NONE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; + // set columns to scan if (TSDB_CODE_SUCCESS == code) { code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_COL, @@ -336,6 +339,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->joinType = pJoinTable->joinType; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; + pJoin->node.groupAction = GROUP_ACTION_CLEAR; + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; int32_t code = TSDB_CODE_SUCCESS; @@ -472,6 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } pAgg->hasLastRow = pSelect->hasLastRowFunc; + pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; @@ -540,6 +549,10 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc; pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc; pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc; + pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP; + pIdfRowsFunc->node.requireDataOrder = + pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE; + pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder; // indefinite rows functions and _select_values functions int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs); @@ -571,6 +584,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return TSDB_CODE_OUT_OF_MEMORY; } + pInterpFunc->node.groupAction = GROUP_ACTION_KEEP; + pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder; + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsInterpFunc, &pInterpFunc->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); @@ -642,10 +659,12 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo } pWindow->winType = WINDOW_TYPE_STATE; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pStateExpr = nodesCloneNode(pState->pExpr); - pWindow->pTspk = nodesCloneNode(pState->pCol); - if (NULL == pWindow->pTspk) { + if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) { nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } @@ -663,6 +682,9 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW pWindow->winType = WINDOW_TYPE_SESSION; pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol); if (NULL == pWindow->pTspk) { @@ -689,6 +711,9 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; pWindow->pTspk = nodesCloneNode(pInterval->pCol); if (NULL == pWindow->pTspk) { @@ -734,6 +759,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } + pFill->node.groupAction = GROUP_ACTION_KEEP; + pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) { code = nodesListMakeStrictAppend(&pFill->node.pTargets, @@ -768,6 +797,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect } pSort->groupSort = pSelect->groupSort; + pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; + pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL; int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) { @@ -818,6 +850,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); + pProject->node.groupAction = GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; @@ -852,6 +885,10 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS return TSDB_CODE_OUT_OF_MEMORY; } + pPartition->node.groupAction = GROUP_ACTION_SET; + pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) { @@ -884,6 +921,10 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe return TSDB_CODE_OUT_OF_MEMORY; } + pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + int32_t code = TSDB_CODE_SUCCESS; // set grouyp keys, agg funcs and having conditions pAgg->pGroupKeys = nodesCloneList(pSelect->pProjectionList); @@ -1348,25 +1389,163 @@ static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) { } } +static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_BLOCK) { + return TSDB_CODE_SUCCESS; + } + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->node.resultDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) { + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) { + if (requirement > DATA_ORDER_LEVEL_NONE) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) { + pProject->node.resultDataOrder = requirement; + pProject->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_BLOCK) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + switch (pWindow->winType) { + case WINDOW_TYPE_INTERVAL: + return adjustIntervalDataRequirement(pWindow, requirement); + case WINDOW_TYPE_SESSION: + return adjustSessionDataRequirement(pWindow, requirement); + case WINDOW_TYPE_STATE: + return adjustStateDataRequirement(pWindow, requirement); + default: + break; + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + return TSDB_CODE_SUCCESS; + } + pFill->node.resultDataOrder = requirement; + pFill->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) { + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) { + if (DATA_ORDER_LEVEL_GLOBAL == requirement) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + pPart->node.resultDataOrder = requirement; + pPart->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) { + if (requirement <= pIndef->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pIndef->node.resultDataOrder = requirement; + pIndef->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) { + if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + return TSDB_CODE_SUCCESS; + } + pInterp->node.resultDataOrder = requirement; + pInterp->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + static int32_t adjustLogicNodeDataRequirementImpl(SLogicNode* pNode, EDataOrderLevel requirement) { + int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: + code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_JOIN: + code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_AGG: + code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_PROJECT: + code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY: case QUERY_NODE_LOGIC_PLAN_EXCHANGE: case QUERY_NODE_LOGIC_PLAN_MERGE: + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: + code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_FILL: + code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_SORT: + code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement); + break; case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: + code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); + break; default: break; } - return TSDB_CODE_SUCCESS; + if (TSDB_CODE_SUCCESS == code) { + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + code = adjustLogicNodeDataRequirementImpl((SLogicNode*)pChild, pNode->requireDataOrder); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + } + return code; } static int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode) { diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index 9cf99d019f0f11d4dab145c553a81eca42578fdc..fde373c66dc8d62646ce7b1ca66388784a66f3ce 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -48,7 +48,7 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) { "WHERE a > 100 GROUP BY b"); } -TEST_F(PlanSubqeuryTest, withSetOperator) { +TEST_F(PlanSubqeuryTest, innerSetOperator) { useDb("root", "test"); run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); @@ -56,10 +56,18 @@ TEST_F(PlanSubqeuryTest, withSetOperator) { run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)"); } -TEST_F(PlanSubqeuryTest, withFill) { +TEST_F(PlanSubqeuryTest, innerFill) { useDb("root", "test"); run("SELECT cnt FROM (SELECT _WSTART ts, COUNT(*) cnt FROM t1 " "WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' INTERVAL(10s) FILL(LINEAR)) " "WHERE ts > '2022-04-06 00:00:00'"); } + +TEST_F(PlanSubqeuryTest, outerInterval) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)"); + + run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)"); +}