未验证 提交 103af37e 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11719 from taosdata/feature/3.0_wxy

feat: sql command 'union'
...@@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { ...@@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExprs(pExprs, doNameExpr, NULL); nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
return cxt.errCode;
}
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) { static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
if (NULL == pNewRoot->pChildren) { if (NULL == pNewRoot->pChildren) {
pNewRoot->pChildren = nodesMakeList(); pNewRoot->pChildren = nodesMakeList();
...@@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, ...@@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
...@@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm ...@@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe ...@@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
} }
// set the output // set the output
...@@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator ...@@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
return code; return code;
} }
static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pAgg->pGroupKeys = nodesCloneList(pSetOperator->pProjectionList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprs(pAgg->pGroupKeys, pSetOperator->pOrderByList);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pAgg;
} else {
nodesDestroyNode(pAgg);
}
return code;
}
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) { static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SLogicNode* pSetOp = NULL; SLogicNode* pSetOp = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO ...@@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
case SET_OP_TYPE_UNION_ALL: case SET_OP_TYPE_UNION_ALL:
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp); code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
break; break;
case SET_OP_TYPE_UNION:
code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp);
break;
default: default:
code = -1; code = -1;
break; break;
......
...@@ -50,6 +50,11 @@ typedef struct SUaInfo { ...@@ -50,6 +50,11 @@ typedef struct SUaInfo {
SLogicSubplan* pSubplan; SLogicSubplan* pSubplan;
} SUaInfo; } SUaInfo;
typedef struct SUnInfo {
SAggLogicNode* pAgg;
SLogicSubplan* pSubplan;
} SUnInfo;
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo); typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) { static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
...@@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) { ...@@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
pSubplan->id.groupId = pCxt->groupId; pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = pNode; pSubplan->pNode = pNode;
// TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
return pSubplan; return pSubplan;
} }
...@@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan ...@@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
pSubplan->subplanType = SUBPLAN_TYPE_MERGE; pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange); if (NULL == pProject->node.pParent) {
pSubplan->pNode = (SLogicNode*)pExchange;
// if (NULL == pProject->node.pParent) { nodesDestroyNode(pProject);
// pSubplan->pNode = (SLogicNode*)pExchange; return TSDB_CODE_SUCCESS;
// nodesDestroyNode(pProject); }
// return TSDB_CODE_SUCCESS;
// } SNode* pNode;
FOREACH(pNode, pProject->node.pParent->pChildren) {
// SNode* pNode; if (nodesEqualNode(pNode, pProject)) {
// FOREACH(pNode, pProject->node.pParent->pChildren) { REPLACE_NODE(pExchange);
// if (nodesEqualNode(pNode, pProject)) { nodesDestroyNode(pNode);
// REPLACE_NODE(pExchange); return TSDB_CODE_SUCCESS;
// nodesDestroyNode(pNode); }
// return TSDB_CODE_SUCCESS; }
// } nodesDestroyNode(pExchange);
// } return TSDB_CODE_FAILED;
// nodesDestroyNode(pExchange);
// return TSDB_CODE_FAILED;
} }
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
...@@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code; return code;
} }
static SLogicNode* unMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
// pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
}
static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
FOREACH(pChild, info.pAgg->node.pChildren) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);
} else {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
nodesClearList(info.pAgg->node.pChildren);
info.pAgg->node.pChildren = NULL;
code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
static const SSplitRule splitRuleSet[] = { static const SSplitRule splitRuleSet[] = {
{ .pName = "SuperTableScan", .splitFunc = stsSplit }, { .pName = "SuperTableScan", .splitFunc = stsSplit },
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit }, { .pName = "ChildTableJoin", .splitFunc = ctjSplit },
{ .pName = "UnionAll", .splitFunc = uaSplit }, { .pName = "UnionAll", .splitFunc = uaSplit },
{ .pName = "Union", .splitFunc = unSplit }
}; };
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
......
...@@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) { ...@@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) {
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20"); run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
} }
TEST_F(PlanSetOpTest, union) {
useDb("root", "test");
run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20");
}
...@@ -62,7 +62,7 @@ public: ...@@ -62,7 +62,7 @@ public:
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
SQueryPlan* pPlan = nullptr; SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
if (g_isDump) { if (g_isDump) {
dump(); dump();
...@@ -162,7 +162,8 @@ private: ...@@ -162,7 +162,8 @@ private:
res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan)); res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan));
} }
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) { void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SArray* pExecNodeList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr));
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList); DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
res_.physiPlan_ = toString((SNode*)(*pPlan)); res_.physiPlan_ = toString((SNode*)(*pPlan));
SNode* pNode; SNode* pNode;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册