提交 0b56a404 编写于 作者: X Xiaoyu Wang

feat: sql command 'union'

上级 e5a7f1b7
......@@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
return cxt.errCode;
}
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
return cxt.errCode;
}
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
if (NULL == pNewRoot->pChildren) {
pNewRoot->pChildren = nodesMakeList();
......@@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
code = rewriteExprForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
......@@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
}
// set the output
......@@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
return code;
}
static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pAgg->pGroupKeys = nodesCloneList(pSetOperator->pProjectionList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprs(pAgg->pGroupKeys, pSetOperator->pOrderByList);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pAgg;
} else {
nodesDestroyNode(pAgg);
}
return code;
}
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SLogicNode* pSetOp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
case SET_OP_TYPE_UNION_ALL:
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
break;
case SET_OP_TYPE_UNION:
code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp);
break;
default:
code = -1;
break;
......
......@@ -50,6 +50,11 @@ typedef struct SUaInfo {
SLogicSubplan* pSubplan;
} SUaInfo;
typedef struct SUnInfo {
SAggLogicNode* pAgg;
SLogicSubplan* pSubplan;
} SUnInfo;
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
......@@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = pNode;
// TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
return pSubplan;
}
......@@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange);
// if (NULL == pProject->node.pParent) {
// pSubplan->pNode = (SLogicNode*)pExchange;
// nodesDestroyNode(pProject);
// return TSDB_CODE_SUCCESS;
// }
// SNode* pNode;
// FOREACH(pNode, pProject->node.pParent->pChildren) {
// if (nodesEqualNode(pNode, pProject)) {
// REPLACE_NODE(pExchange);
// nodesDestroyNode(pNode);
// return TSDB_CODE_SUCCESS;
// }
// }
// nodesDestroyNode(pExchange);
// return TSDB_CODE_FAILED;
if (NULL == pProject->node.pParent) {
pSubplan->pNode = (SLogicNode*)pExchange;
nodesDestroyNode(pProject);
return TSDB_CODE_SUCCESS;
}
SNode* pNode;
FOREACH(pNode, pProject->node.pParent->pChildren) {
if (nodesEqualNode(pNode, pProject)) {
REPLACE_NODE(pExchange);
nodesDestroyNode(pNode);
return TSDB_CODE_SUCCESS;
}
}
nodesDestroyNode(pExchange);
return TSDB_CODE_FAILED;
}
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
......@@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
static SLogicNode* unMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
// pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
}
static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
FOREACH(pChild, info.pAgg->node.pChildren) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);
} else {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
nodesClearList(info.pAgg->node.pChildren);
info.pAgg->node.pChildren = NULL;
code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
static const SSplitRule splitRuleSet[] = {
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
{ .pName = "UnionAll", .splitFunc = uaSplit },
{ .pName = "Union", .splitFunc = unSplit }
};
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
......
......@@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) {
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
}
TEST_F(PlanSetOpTest, union) {
useDb("root", "test");
run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20");
}
......@@ -62,7 +62,7 @@ public:
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
if (g_isDump) {
dump();
......@@ -162,7 +162,8 @@ private:
res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan));
}
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SArray* pExecNodeList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr));
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
res_.physiPlan_ = toString((SNode*)(*pPlan));
SNode* pNode;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册