提交 a62a7cc4 编写于 作者: X Xiaoyu Wang

enh: aggregate and window add field for merge data blocks

上级 18364a6b
......@@ -377,6 +377,7 @@ typedef struct SAggPhysiNode {
SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
bool mergeDataBlock;
} SAggPhysiNode;
typedef struct SDownstreamSourceNode {
......@@ -415,6 +416,7 @@ typedef struct SWinodwPhysiNode {
int8_t igExpired;
EOrder inputTsOrder;
EOrder outputTsOrder;
bool mergeDataBlock;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
......
......@@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
static const char* jkAggPhysiPlanExprs = "Exprs";
static const char* jkAggPhysiPlanGroupKeys = "GroupKeys";
static const char* jkAggPhysiPlanAggFuncs = "AggFuncs";
static const char* jkAggPhysiPlanMergeDataBlock = "MergeDataBlock";
static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj;
......@@ -1787,6 +1788,9 @@ static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkAggPhysiPlanAggFuncs, pNode->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
}
return code;
}
......@@ -1804,6 +1808,9 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkAggPhysiPlanAggFuncs, &pNode->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkAggPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
}
return code;
}
......@@ -1936,8 +1943,9 @@ static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkWindowPhysiPlanInputTsOrder = "inputTsOrder";
static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1970,6 +1978,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkWindowPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
}
return code;
}
......@@ -2005,6 +2016,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkWindowPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
}
return code;
}
......@@ -2147,7 +2161,6 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSessionWindowPhysiPlanGap, pNode->gap, code);
;
}
return code;
......
......@@ -468,6 +468,10 @@ static int32_t createGroupKeysFromPartKeys(SNodeList* pPartKeys, SNodeList** pOu
return TSDB_CODE_SUCCESS;
}
static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
return (pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit) ? GROUP_ACTION_KEEP : GROUP_ACTION_NONE;
}
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) {
return TSDB_CODE_SUCCESS;
......@@ -480,7 +484,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.groupAction = getGroupAction(pCxt, pSelect);
pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
......@@ -551,7 +555,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc;
pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc;
pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc;
pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP;
pIdfRowsFunc->node.groupAction = getGroupAction(pCxt, pSelect);
pIdfRowsFunc->node.requireDataOrder =
pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder;
......@@ -586,7 +590,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pInterpFunc->node.groupAction = GROUP_ACTION_KEEP;
pInterpFunc->node.groupAction = getGroupAction(pCxt, pSelect);
pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder;
......@@ -662,7 +666,7 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo
}
pWindow->winType = WINDOW_TYPE_STATE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pStateExpr = nodesCloneNode(pState->pExpr);
......@@ -685,7 +689,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
......@@ -714,7 +718,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder = pSelect->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_IN_BLOCK;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
......@@ -762,7 +766,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pFill->node.groupAction = GROUP_ACTION_KEEP;
pFill->node.groupAction = getGroupAction(pCxt, pSelect);
pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->inputTsOrder = ORDER_ASC;
......@@ -936,7 +940,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
return TSDB_CODE_OUT_OF_MEMORY;
}
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.groupAction = GROUP_ACTION_CLEAR;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
......
......@@ -825,6 +825,8 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
return TSDB_CODE_OUT_OF_MEMORY;
}
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pGroupKeys = NULL;
SNodeList* pAggFuncs = NULL;
......@@ -1091,6 +1093,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->igExpired = pWindowLogicNode->igExpired;
pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder;
pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder;
pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
......
......@@ -393,6 +393,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
code = TSDB_CODE_OUT_OF_MEMORY;
}
pPartWin->node.groupAction = GROUP_ACTION_KEEP;
if (TSDB_CODE_SUCCESS == code) {
pMergeWindow->node.pTargets = pTargets;
pMergeWindow->node.pConditions = pConditions;
......@@ -723,6 +725,8 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
code = TSDB_CODE_OUT_OF_MEMORY;
}
pPartAgg->node.groupAction = GROUP_ACTION_KEEP;
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pPartAgg->pGroupKeys = pGroupKeys;
code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
......
......@@ -32,6 +32,22 @@ TEST_F(PlanGroupByTest, basic) {
run("SELECT c1 + c3, SUM(c4 * c5) FROM t1 WHERE CONCAT(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3");
run("SELECT SUM(CEIL(c1)) FROM t1 GROUP BY CEIL(c1)");
run("SELECT COUNT(*) FROM st1");
run("SELECT c1 FROM st1 GROUP BY c1");
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL");
}
TEST_F(PlanGroupByTest, withPartitionBy) {
useDb("root", "test");
run("SELECT LAST(ts), TBNAME FROM st1 PARTITION BY TBNAME");
run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1");
}
TEST_F(PlanGroupByTest, withOrderBy) {
......@@ -43,14 +59,12 @@ TEST_F(PlanGroupByTest, withOrderBy) {
// run("SELECT COUNT(*), SUM(c1) a FROM t1 ORDER BY a");
}
TEST_F(PlanGroupByTest, aggFunc) {
TEST_F(PlanGroupByTest, multiResFunc) {
useDb("root", "test");
run("SELECT LAST(*), FIRST(*) FROM t1");
run("SELECT LAST(*), FIRST(*) FROM t1 GROUP BY c1");
run("SELECT SUM(10), COUNT(c1) FROM t1 GROUP BY c2");
}
TEST_F(PlanGroupByTest, selectFunc) {
......@@ -67,17 +81,3 @@ TEST_F(PlanGroupByTest, selectFunc) {
run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3");
run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3");
}
TEST_F(PlanGroupByTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1");
run("SELECT c1 FROM st1 GROUP BY c1");
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1");
run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册