提交 1979801c 编写于 作者: X Xiaoyu Wang

fix: the problem of data loss when interval is used for outer query

上级 c54ed52a
......@@ -33,6 +33,13 @@ typedef enum EDataOrderLevel {
DATA_ORDER_LEVEL_GLOBAL
} EDataOrderLevel;
typedef enum EGroupAction {
GROUP_ACTION_NONE = 1,
GROUP_ACTION_SET,
GROUP_ACTION_KEEP,
GROUP_ACTION_CLEAR
} EGroupAction;
typedef struct SLogicNode {
ENodeType type;
SNodeList* pTargets; // SColumnNode
......@@ -45,6 +52,7 @@ typedef struct SLogicNode {
SNode* pSlimit;
EDataOrderLevel requireDataOrder; // requirements for input data
EDataOrderLevel resultDataOrder; // properties of the output data
EGroupAction groupAction;
} SLogicNode;
typedef enum EScanType {
......
......@@ -332,6 +332,9 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(precision);
CLONE_NODE_FIELD(pLimit);
CLONE_NODE_FIELD(pSlimit);
COPY_SCALAR_FIELD(requireDataOrder);
COPY_SCALAR_FIELD(resultDataOrder);
COPY_SCALAR_FIELD(groupAction);
return TSDB_CODE_SUCCESS;
}
......
......@@ -250,6 +250,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
SScanLogicNode* pScan = NULL;
int32_t code = makeScanLogicNode(pCxt, pRealTable, pSelect->hasRepeatScanFuncs, (SLogicNode**)&pScan);
pScan->node.groupAction = GROUP_ACTION_NONE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
// set columns to scan
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_COL,
......@@ -336,6 +339,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->joinType = pJoinTable->joinType;
pJoin->isSingleTableJoin = pJoinTable->table.singleTable;
pJoin->node.groupAction = GROUP_ACTION_CLEAR;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -472,6 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
}
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -540,6 +549,10 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc;
pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc;
pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc;
pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP;
pIdfRowsFunc->node.requireDataOrder =
pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder;
// indefinite rows functions and _select_values functions
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs);
......@@ -571,6 +584,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pInterpFunc->node.groupAction = GROUP_ACTION_KEEP;
pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder;
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsInterpFunc, &pInterpFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
......@@ -642,10 +659,12 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo
}
pWindow->winType = WINDOW_TYPE_STATE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pStateExpr = nodesCloneNode(pState->pExpr);
pWindow->pTspk = nodesCloneNode(pState->pCol);
if (NULL == pWindow->pTspk) {
if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -663,6 +682,9 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol);
if (NULL == pWindow->pTspk) {
......@@ -689,6 +711,9 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
......@@ -734,6 +759,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pFill->node.groupAction = GROUP_ACTION_KEEP;
pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) {
code = nodesListMakeStrictAppend(&pFill->node.pTargets,
......@@ -768,6 +797,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
}
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) {
......@@ -818,6 +850,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
pProject->node.groupAction = GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
......@@ -852,6 +885,10 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
return TSDB_CODE_OUT_OF_MEMORY;
}
pPartition->node.groupAction = GROUP_ACTION_SET;
pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code =
nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) {
......@@ -884,6 +921,10 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
return TSDB_CODE_OUT_OF_MEMORY;
}
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions
pAgg->pGroupKeys = nodesCloneList(pSelect->pProjectionList);
......@@ -1348,25 +1389,163 @@ static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) {
}
}
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_BLOCK) {
return TSDB_CODE_SUCCESS;
}
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
pScan->node.resultDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) {
return TSDB_CODE_SUCCESS;
}
static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) {
if (requirement > DATA_ORDER_LEVEL_NONE) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) {
pProject->node.resultDataOrder = requirement;
pProject->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_BLOCK) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
return adjustIntervalDataRequirement(pWindow, requirement);
case WINDOW_TYPE_SESSION:
return adjustSessionDataRequirement(pWindow, requirement);
case WINDOW_TYPE_STATE:
return adjustStateDataRequirement(pWindow, requirement);
default:
break;
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) {
return TSDB_CODE_SUCCESS;
}
pFill->node.resultDataOrder = requirement;
pFill->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) {
return TSDB_CODE_SUCCESS;
}
static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) {
if (DATA_ORDER_LEVEL_GLOBAL == requirement) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pPart->node.resultDataOrder = requirement;
pPart->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) {
if (requirement <= pIndef->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pIndef->node.resultDataOrder = requirement;
pIndef->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) {
if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) {
return TSDB_CODE_SUCCESS;
}
pInterp->node.resultDataOrder = requirement;
pInterp->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustLogicNodeDataRequirementImpl(SLogicNode* pNode, EDataOrderLevel requirement) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY:
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
case QUERY_NODE_LOGIC_PLAN_MERGE:
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_FILL:
code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement);
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) {
code = adjustLogicNodeDataRequirementImpl((SLogicNode*)pChild, pNode->requireDataOrder);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
return code;
}
static int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode) {
......
......@@ -48,7 +48,7 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) {
"WHERE a > 100 GROUP BY b");
}
TEST_F(PlanSubqeuryTest, withSetOperator) {
TEST_F(PlanSubqeuryTest, innerSetOperator) {
useDb("root", "test");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
......@@ -56,10 +56,18 @@ TEST_F(PlanSubqeuryTest, withSetOperator) {
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)");
}
TEST_F(PlanSubqeuryTest, withFill) {
TEST_F(PlanSubqeuryTest, innerFill) {
useDb("root", "test");
run("SELECT cnt FROM (SELECT _WSTART ts, COUNT(*) cnt FROM t1 "
"WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' INTERVAL(10s) FILL(LINEAR)) "
"WHERE ts > '2022-04-06 00:00:00'");
}
TEST_F(PlanSubqeuryTest, outerInterval) {
useDb("root", "test");
run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)");
run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册