提交 6e6c9bda 编写于 作者: X Xiaoyu Wang

feat: order by distributed split

上级 7ede8db3
...@@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
} }
// case QUERY_NODE_LOGIC_PLAN_SORT: // case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(pNode); // return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
default: default:
...@@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic ...@@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code; return code;
} }
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys,
SLogicNode* pPartChild) {
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) { if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S ...@@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
pMerge->srcGroupId = pCxt->groupId; pMerge->srcGroupId = pCxt->groupId;
pMerge->node.pParent = pParent; pMerge->node.pParent = pParent;
pMerge->node.precision = pPartChild->precision; pMerge->node.precision = pPartChild->precision;
int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); pMerge->pMergeKeys = pMergeKeys;
if (TSDB_CODE_SUCCESS == code) {
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) { if (NULL == pMerge->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY; nodesDestroyNode(pMerge);
} return TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pParent->pChildren, pMerge);
} }
return code; return nodesListMakeAppend(&pParent->pChildren, pMerge);
} }
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
...@@ -425,6 +429,64 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -425,6 +429,64 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) {
SNodeList* pSortKeys = pMergeSort->pSortKeys;
pMergeSort->pSortKeys = NULL;
SNodeList* pTargets = pMergeSort->node.pTargets;
pMergeSort->node.pTargets = NULL;
SNodeList* pChildren = pMergeSort->node.pChildren;
pMergeSort->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort);
if (NULL == pPartSort) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
pMergeSort->node.pTargets = pTargets;
pPartSort->node.pChildren = pChildren;
if (TSDB_CODE_SUCCESS == code) {
pPartSort->pSortKeys = pSortKeys;
code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets);
if (NULL == pMergeSort->pSortKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartSort;
} else {
nodesDestroyNode(pPartSort);
}
return code;
}
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort);
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys);
if (NULL != pMergeKeys) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -452,6 +514,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -452,6 +514,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = stbSplSplitWindowNode(pCxt, &info); code = stbSplSplitWindowNode(pCxt, &info);
break; break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = stbSplSplitSortNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
code = stbSplSplitScanNode(pCxt, &info); code = stbSplSplitScanNode(pCxt, &info);
break; break;
......
...@@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; ...@@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {};
TEST_F(PlanOrderByTest, basic) { TEST_F(PlanOrderByTest, basic) {
useDb("root", "test"); useDb("root", "test");
// order by key is in the projection list // ORDER BY key is in the projection list
run("select c1 from t1 order by c1"); run("SELECT c1 FROM t1 ORDER BY c1");
// order by key is not in the projection list // ORDER BY key is not in the projection list
run("select c1 from t1 order by c2"); run("SELECT c1 FROM t1 ORDER BY c2");
} }
TEST_F(PlanOrderByTest, expr) { TEST_F(PlanOrderByTest, expr) {
useDb("root", "test"); useDb("root", "test");
run("select * from t1 order by c1 + 10, c2"); run("SELECT * FROM t1 ORDER BY c1 + 10, c2");
} }
TEST_F(PlanOrderByTest, nullsOrder) { TEST_F(PlanOrderByTest, nullsOrder) {
useDb("root", "test"); useDb("root", "test");
run("select * from t1 order by c1 desc nulls first"); run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST");
}
TEST_F(PlanOrderByTest, stable) {
useDb("root", "test");
// ORDER BY key is in the projection list
run("SELECT c1 FROM st1 ORDER BY c1");
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册