diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 3aad68ae7d5f3f980ec210d5a8a5a6348d6742a2..a3d508690880c1d229c65137014f87d8efa733a9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(pNode); + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: @@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, + SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S pMerge->srcGroupId = pCxt->groupId; pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; - int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); - if (TSDB_CODE_SUCCESS == code) { - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeAppend(&pParent->pChildren, pMerge); + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return nodesListMakeAppend(&pParent->pChildren, pMerge); } static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -425,6 +429,64 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { + SNodeList* pSortKeys = pMergeSort->pSortKeys; + pMergeSort->pSortKeys = NULL; + SNodeList* pTargets = pMergeSort->node.pTargets; + pMergeSort->node.pTargets = NULL; + SNodeList* pChildren = pMergeSort->node.pChildren; + pMergeSort->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + if (NULL == pPartSort) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + pMergeSort->node.pTargets = pTargets; + pPartSort->node.pChildren = pChildren; + if (TSDB_CODE_SUCCESS == code) { + pPartSort->pSortKeys = pSortKeys; + code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); + if (NULL == pMergeSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartSort; + } else { + nodesDestroyNode(pPartSort); + } + + return code; +} + +static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartSort = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); + if (NULL != pMergeKeys) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -452,6 +514,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = stbSplSplitSortNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_SCAN: code = stbSplSplitScanNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index d95d1bdf1d429d9ea6c5e62c5a280dbb5c6de477..ef6f438b55c60f89e96de40c9eeeff71f3f2fcdf 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; TEST_F(PlanOrderByTest, basic) { useDb("root", "test"); - // order by key is in the projection list - run("select c1 from t1 order by c1"); - // order by key is not in the projection list - run("select c1 from t1 order by c2"); + // ORDER BY key is in the projection list + run("SELECT c1 FROM t1 ORDER BY c1"); + // ORDER BY key is not in the projection list + run("SELECT c1 FROM t1 ORDER BY c2"); } TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); - run("select * from t1 order by c1 + 10, c2"); + run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); } TEST_F(PlanOrderByTest, nullsOrder) { useDb("root", "test"); - run("select * from t1 order by c1 desc nulls first"); + run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST"); +} + +TEST_F(PlanOrderByTest, stable) { + useDb("root", "test"); + + // ORDER BY key is in the projection list + run("SELECT c1 FROM st1 ORDER BY c1"); }