提交 853f1df3 编写于 作者: X Xiaoyu Wang

feat: support partition by expression and aggregate function output together

上级 4c8c302f
......@@ -75,6 +75,7 @@ typedef struct SScanLogicNode {
double filesFactor;
SArray* pSmaIndexes;
SNodeList* pPartTags;
bool partSort;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......
......@@ -1274,13 +1274,12 @@ static bool validateTimestampDigits(const SValueNode* pVal) {
}
int64_t tsVal = pVal->datum.i;
char fraction[20] = {0};
char fraction[20] = {0};
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
return true;
} else {
......@@ -1510,6 +1509,11 @@ static int32_t translateBlockDistInfoFunc(SFunctionNode* pFunc, char* pErrBuf, i
return TSDB_CODE_SUCCESS;
}
static int32_t translateGroupKeyFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
return TSDB_CODE_SUCCESS;
}
static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
return true;
......@@ -2499,6 +2503,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_BLOCK_DIST_INFO,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.translateFunc = translateBlockDistInfoFunc,
},
{
.name = "_group_key",
.type = FUNCTION_TYPE_BLOCK_DIST_INFO,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateGroupKeyFunc,
.pPartialFunc = "_group_key",
.pMergeFunc = "_group_key"
}
};
// clang-format on
......
......@@ -1355,6 +1355,24 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
}
static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
*pNode = (SNode*)pFunc;
pCxt->errCode = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
}
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
SCheckExprForGroupByCxt* pCxt = (SCheckExprForGroupByCxt*)pContext;
if (!nodesIsExprNode(*pNode) || isAliasColumn(*pNode)) {
......@@ -1371,10 +1389,10 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
if (isAggFunc(*pNode) && !isDistinctOrderBy(pCxt->pTranslateCxt)) {
return DEAL_RES_IGNORE_CHILD;
}
SNode* pGroupNode;
SNode* pGroupNode = NULL;
FOREACH(pGroupNode, getGroupByList(pCxt->pTranslateCxt)) {
if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) {
return DEAL_RES_IGNORE_CHILD;
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
......@@ -1441,22 +1459,28 @@ typedef struct CheckAggColCoexistCxt {
bool existOtherAggFunc;
} CheckAggColCoexistCxt;
static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) {
static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
CheckAggColCoexistCxt* pCxt = (CheckAggColCoexistCxt*)pContext;
if (isSelectFunc(pNode)) {
if (isSelectFunc(*pNode)) {
++(pCxt->selectFuncNum);
} else if (isAggFunc(pNode)) {
} else if (isAggFunc(*pNode)) {
pCxt->existOtherAggFunc = true;
}
if (isAggFunc(pNode)) {
if (isAggFunc(*pNode)) {
pCxt->existAggFunc = true;
return DEAL_RES_IGNORE_CHILD;
}
if (isIndefiniteRowsFunc(pNode)) {
if (isIndefiniteRowsFunc(*pNode)) {
pCxt->existIndefiniteRowsFunc = true;
return DEAL_RES_IGNORE_CHILD;
}
if (isScanPseudoColumnFunc(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
SNode* pPartKey = NULL;
FOREACH(pPartKey, pCxt->pTranslateCxt->pCurrSelectStmt->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
}
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
pCxt->existCol = true;
}
return DEAL_RES_CONTINUE;
......@@ -1472,9 +1496,9 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
.existIndefiniteRowsFunc = false,
.selectFuncNum = 0,
.existOtherAggFunc = false};
nodesWalkExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
nodesRewriteExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
if (!pSelect->isDistinct) {
nodesWalkExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt);
nodesRewriteExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt);
}
if (1 == cxt.selectFuncNum && !cxt.existOtherAggFunc) {
return rewriteColsToSelectValFunc(pCxt, pSelect);
......
......@@ -199,6 +199,20 @@ TEST_F(ParserSelectTest, tailFuncSemanticCheck) {
run("SELECT TAIL(c1, 10) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, partitionBy) {
useDb("root", "test");
run("SELECT c1, c2 FROM t1 PARTITION BY c2");
run("SELECT SUM(c1), c2 FROM t1 PARTITION BY c2");
}
TEST_F(ParserSelectTest, partitionBySemanticCheck) {
useDb("root", "test");
run("SELECT SUM(c1), c2, c3 FROM t1 PARTITION BY c2", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
}
TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");
......@@ -213,6 +227,15 @@ TEST_F(ParserSelectTest, groupBy) {
run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2");
}
TEST_F(ParserSelectTest, groupBySemanticCheck) {
useDb("root", "test");
run("SELECT COUNT(*) cnt, c1 FROM t1 WHERE c1 > 0", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 PARTITION BY c2 GROUP BY c1",
TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
}
TEST_F(ParserSelectTest, orderBy) {
useDb("root", "test");
......
......@@ -457,15 +457,15 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
}
}
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
}
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
......
......@@ -1075,6 +1075,16 @@ static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
return code;
}
static int32_t partTagOptRewriteGroupKey(SAggLogicNode* pAgg, SScanLogicNode* pScan) {
// SNode* pNode = NULL;
// FOREACH(pNode, pScan->pPartTags) {
// if (QUERY_NODE_COLUMN != nodeType(pNode)) {
// createColumnByRewriteExpr(pNode, );
// }
// }
return TSDB_CODE_SUCCESS;
}
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pNode) {
......@@ -1100,6 +1110,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
}
}
NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys);
if (TSDB_CODE_SUCCESS == code) {
code = partTagOptRewriteGroupKey((SAggLogicNode*)pNode, pScan);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = partTagsOptRebuildTbanme(pScan->pPartTags);
......@@ -1189,7 +1202,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
// {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
};
// clang-format on
......
......@@ -390,9 +390,6 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pSubplan) {
nodesDestroyNode((SNode*)pSplitNode);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pMerge);
}
......@@ -564,6 +561,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pPartTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return NULL;
}
......@@ -574,14 +573,15 @@ static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
return false;
}
SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
}
static bool stbSplIsMultiTableWinodw(SWindowLogicNode* pWindow) {
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL:
return stbSplSplitInterval(pCxt, pInfo);
......@@ -595,7 +595,7 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
return TSDB_CODE_SUCCESS;
......@@ -616,10 +616,10 @@ static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitI
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (stbSplIsMultiTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitWindowForMultiTable(pCxt, pInfo);
if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitWindowForPartTable(pCxt, pInfo);
} else {
return stbSplSplitWindowForMergeTable(pCxt, pInfo);
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
}
}
......
......@@ -57,9 +57,15 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
TEST_F(PlanOptimizeTest, PartitionTags) {
useDb("root", "test");
run("SELECT c1 FROM st1 PARTITION BY tag1");
run("SELECT c1, tag1 FROM st1 PARTITION BY tag1");
run("SELECT SUM(c1) FROM st1 GROUP BY tag1");
run("SELECT SUM(c1), tag1 FROM st1 PARTITION BY tag1");
run("SELECT SUM(c1), tag1 + 10 FROM st1 PARTITION BY tag1 + 10");
run("SELECT SUM(c1), tag1 FROM st1 GROUP BY tag1");
run("SELECT SUM(c1), tag1 + 10 FROM st1 GROUP BY tag1 + 10");
}
TEST_F(PlanOptimizeTest, eliminateProjection) {
......
......@@ -34,6 +34,8 @@ TEST_F(PlanPartitionByTest, withAggFunc) {
useDb("root", "test");
run("select count(*) from t1 partition by c1");
run("select count(*), c1 from t1 partition by c1");
}
TEST_F(PlanPartitionByTest, withInterval) {
......@@ -41,8 +43,12 @@ TEST_F(PlanPartitionByTest, withInterval) {
// normal/child table
run("select count(*) from t1 partition by c1 interval(10s)");
run("select count(*), c1 from t1 partition by c1 interval(10s)");
// super table
run("select count(*) from st1 partition by tag1, tag2 interval(10s)");
run("select count(*), tag1 from st1 partition by tag1, tag2 interval(10s)");
}
TEST_F(PlanPartitionByTest, withGroupBy) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册