diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 2f6bb603c1d6421fb246157f2d48d75ee7b42bd6..6f25cc45fb2df8b675f1faeb83330a8fabf12bfe 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -377,6 +377,7 @@ typedef struct SAggPhysiNode { SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function SNodeList* pGroupKeys; SNodeList* pAggFuncs; + bool mergeDataBlock; } SAggPhysiNode; typedef struct SDownstreamSourceNode { @@ -415,6 +416,7 @@ typedef struct SWinodwPhysiNode { int8_t igExpired; EOrder inputTsOrder; EOrder outputTsOrder; + bool mergeDataBlock; } SWinodwPhysiNode; typedef struct SIntervalPhysiNode { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 9d15b01acf0d1301fafc44ea58acb74c36892bd7..74659179a9a6fee4bebea48cae02441cdde69c6e 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { static const char* jkAggPhysiPlanExprs = "Exprs"; static const char* jkAggPhysiPlanGroupKeys = "GroupKeys"; static const char* jkAggPhysiPlanAggFuncs = "AggFuncs"; +static const char* jkAggPhysiPlanMergeDataBlock = "MergeDataBlock"; static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) { const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj; @@ -1787,6 +1788,9 @@ static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkAggPhysiPlanAggFuncs, pNode->pAggFuncs); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanMergeDataBlock, pNode->mergeDataBlock); + } return code; } @@ -1804,6 +1808,9 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkAggPhysiPlanAggFuncs, &pNode->pAggFuncs); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkAggPhysiPlanMergeDataBlock, &pNode->mergeDataBlock); + } return code; } @@ -1936,8 +1943,9 @@ static const char* jkWindowPhysiPlanTsEnd = "TsEnd"; static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; static const char* jkWindowPhysiPlanWatermark = "Watermark"; static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired"; -static const char* jkWindowPhysiPlanInputTsOrder = "inputTsOrder"; +static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder"; +static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; @@ -1970,6 +1978,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkWindowPhysiPlanMergeDataBlock, pNode->mergeDataBlock); + } return code; } @@ -2005,6 +2016,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder, code); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkWindowPhysiPlanMergeDataBlock, &pNode->mergeDataBlock); + } return code; } @@ -2147,7 +2161,6 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysiWindowNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code); - ; } return code; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b51624336b34a699abeaf980f2cfcf82596a0c2d..a799b2feb6cbfb528837ee03927065c443f2208a 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -468,6 +468,10 @@ static int32_t createGroupKeysFromPartKeys(SNodeList* pPartKeys, SNodeList** pOu return TSDB_CODE_SUCCESS; } +static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { + return (pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit) ? GROUP_ACTION_KEEP : GROUP_ACTION_NONE; +} + static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) { return TSDB_CODE_SUCCESS; @@ -480,7 +484,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, pAgg->hasLastRow = pSelect->hasLastRowFunc; pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc; - pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.groupAction = getGroupAction(pCxt, pSelect); pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE; pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; @@ -551,7 +555,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc; pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc; pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc; - pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP; + pIdfRowsFunc->node.groupAction = getGroupAction(pCxt, pSelect); pIdfRowsFunc->node.requireDataOrder = pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE; pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder; @@ -586,7 +590,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return TSDB_CODE_OUT_OF_MEMORY; } - pInterpFunc->node.groupAction = GROUP_ACTION_KEEP; + pInterpFunc->node.groupAction = getGroupAction(pCxt, pSelect); pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder; @@ -662,7 +666,7 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo } pWindow->winType = WINDOW_TYPE_STATE; - pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.groupAction = getGroupAction(pCxt, pSelect); pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pStateExpr = nodesCloneNode(pState->pExpr); @@ -685,7 +689,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW pWindow->winType = WINDOW_TYPE_SESSION; pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE; - pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.groupAction = getGroupAction(pCxt, pSelect); pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; @@ -714,7 +718,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; - pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.groupAction = getGroupAction(pCxt, pSelect); pWindow->node.requireDataOrder = pSelect->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_IN_BLOCK; pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; @@ -762,7 +766,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } - pFill->node.groupAction = GROUP_ACTION_KEEP; + pFill->node.groupAction = getGroupAction(pCxt, pSelect); pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pFill->inputTsOrder = ORDER_ASC; @@ -936,7 +940,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe return TSDB_CODE_OUT_OF_MEMORY; } - pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.groupAction = GROUP_ACTION_CLEAR; pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 3771586b34d6bc8db093fd38b1692997069ce39b..af3d4a9577d85c6e41bab85029f3ff2307426e39 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -825,6 +825,8 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, return TSDB_CODE_OUT_OF_MEMORY; } + pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true); + SNodeList* pPrecalcExprs = NULL; SNodeList* pGroupKeys = NULL; SNodeList* pAggFuncs = NULL; @@ -1091,6 +1093,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pWindow->igExpired = pWindowLogicNode->igExpired; pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder; pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder; + pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true); SNodeList* pPrecalcExprs = NULL; SNodeList* pFuncs = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 74c78b970d6edf973d5ca5370f936c96a0466406..6ddb6d90c0509b78e442e24f46af5cf41217495c 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -393,6 +393,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic code = TSDB_CODE_OUT_OF_MEMORY; } + pPartWin->node.groupAction = GROUP_ACTION_KEEP; + if (TSDB_CODE_SUCCESS == code) { pMergeWindow->node.pTargets = pTargets; pMergeWindow->node.pConditions = pConditions; @@ -723,6 +725,8 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO code = TSDB_CODE_OUT_OF_MEMORY; } + pPartAgg->node.groupAction = GROUP_ACTION_KEEP; + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { pPartAgg->pGroupKeys = pGroupKeys; code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index 657d8a62447f8f2c2f52caa0f9de6a6710d61bbb..a553d3addc7e445412a56a579e2fda7e02f742ea 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -32,6 +32,22 @@ TEST_F(PlanGroupByTest, basic) { run("SELECT c1 + c3, SUM(c4 * c5) FROM t1 WHERE CONCAT(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3"); run("SELECT SUM(CEIL(c1)) FROM t1 GROUP BY CEIL(c1)"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT c1 FROM st1 GROUP BY c1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); + + run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL"); +} + +TEST_F(PlanGroupByTest, withPartitionBy) { + useDb("root", "test"); + + run("SELECT LAST(ts), TBNAME FROM st1 PARTITION BY TBNAME"); + + run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1"); } TEST_F(PlanGroupByTest, withOrderBy) { @@ -43,14 +59,12 @@ TEST_F(PlanGroupByTest, withOrderBy) { // run("SELECT COUNT(*), SUM(c1) a FROM t1 ORDER BY a"); } -TEST_F(PlanGroupByTest, aggFunc) { +TEST_F(PlanGroupByTest, multiResFunc) { useDb("root", "test"); run("SELECT LAST(*), FIRST(*) FROM t1"); run("SELECT LAST(*), FIRST(*) FROM t1 GROUP BY c1"); - - run("SELECT SUM(10), COUNT(c1) FROM t1 GROUP BY c2"); } TEST_F(PlanGroupByTest, selectFunc) { @@ -67,17 +81,3 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } - -TEST_F(PlanGroupByTest, stable) { - useDb("root", "test"); - - run("SELECT COUNT(*) FROM st1"); - - run("SELECT c1 FROM st1 GROUP BY c1"); - - run("SELECT COUNT(*) FROM st1 GROUP BY c1"); - - run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1"); - - run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL"); -}