diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 611ae8d81fdc681c28936456b5b46c0a7e09d4c0..67efdc5c0b91266500e3146b3c2d76350f28087d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { } bool fmIsDistExecFunc(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId)) { + return false; + } if (!fmIsVectorFunc(funcId)) { return true; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e3c8b82e3988bd7943815a645332920f9d31d08b..3aad68ae7d5f3f980ec210d5a8a5a6348d6742a2 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { - // case QUERY_NODE_LOGIC_PLAN_AGG: - // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_AGG: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL != pWindow->winType) { @@ -365,6 +365,66 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf } } +static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { + SNodeList* pFunc = pMergeAgg->pAggFuncs; + pMergeAgg->pAggFuncs = NULL; + SNodeList* pGroupKeys = pMergeAgg->pGroupKeys; + pMergeAgg->pGroupKeys = NULL; + SNodeList* pTargets = pMergeAgg->node.pTargets; + pMergeAgg->node.pTargets = NULL; + SNodeList* pChildren = pMergeAgg->node.pChildren; + pMergeAgg->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg); + if (NULL == pPartAgg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pPartAgg->pGroupKeys = pGroupKeys; + code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); + if (NULL == pMergeAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pTargets = pTargets; + pPartAgg->node.pChildren = pChildren; + + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartAgg; + } else { + nodesDestroyNode(pPartAgg); + } + + return code; +} + +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -386,6 +446,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_AGG: + code = stbSplSplitAggNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index cf516034707e53a230d4e2e7af6fb81c3b8aaecf..201df2efde6236c5ae68aaedf04625a2c4acda19 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } + +TEST_F(PlanGroupByTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); +}