From 849cb8d1b357b9edd871f08b488ee32f19d6a679 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 23 Jun 2022 08:52:47 +0800 Subject: [PATCH] feat: split session and state window on stable --- source/libs/executor/src/timewindowoperator.c | 3 +- source/libs/parser/src/parTranslater.c | 3 - source/libs/planner/src/planSpliter.c | 75 ++++++++++++++++++- source/libs/planner/test/planSessionTest.cpp | 10 +++ source/libs/planner/test/planStateTest.cpp | 9 +++ 5 files changed, 91 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3ff45d7237..19f0fd4ea7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1656,10 +1656,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); - return NULL; } - return pBInfo->pRes; + return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL; } int64_t st = taosGetTimestampUs(); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 43a4cf0ed6..e02cb931e9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2161,9 +2161,6 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { if (COLUMN_TYPE_TAG == pCol->colType) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL); } - if (TSDB_SUPER_TABLE == pCol->tableType) { - return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE); - } } return DEAL_RES_CONTINUE; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index f0e0e84bd9..f4c1abb354 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -176,10 +176,22 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; - if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType)) { - return false; + if (WINDOW_TYPE_INTERVAL == pWindow->winType) { + return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); + if (WINDOW_TYPE_STATE == pWindow->winType) { + if (!streamQuery) { + return stbSplHasMultiTbScan(streamQuery, pNode); + } else { + return false; + } + } + if (WINDOW_TYPE_SESSION == pWindow->winType) { + if (!streamQuery) { + return stbSplHasMultiTbScan(streamQuery, pNode); + } else { + return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); + } } - return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } case QUERY_NODE_LOGIC_PLAN_SORT: return stbSplHasMultiTbScan(streamQuery, pNode); @@ -477,11 +489,64 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo return code; } +static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + ((SScanLogicNode*)pNode)->scanType = scanType; + } else { + if (1 == LIST_LENGTH(pNode->pChildren)) { + splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType); + } + } +} + +static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pWindow = pInfo->pSplitNode; + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0); + + SNodeList* pMergeKeys = NULL; + int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); + + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild); + } + + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT)); + } + + if (TSDB_CODE_SUCCESS == code) { + splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE); + ++(pCxt->groupId); + } + + if (TSDB_CODE_SUCCESS == code) { + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); + } else { + nodesDestroyList(pMergeKeys); + } + + return code; +} + static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) { if (pCxt->pPlanCxt->streamQuery) { return stbSplSplitSessionForStream(pCxt, pInfo); } else { - return TSDB_CODE_PLAN_INTERNAL_ERROR; + return stbSplSplitSessionOrStateForBatch(pCxt, pInfo); + } +} + +static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + if (pCxt->pPlanCxt->streamQuery) { + return stbSplSplitStateForStream(pCxt, pInfo); + } else { + return stbSplSplitSessionOrStateForBatch(pCxt, pInfo); } } @@ -511,6 +576,8 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI return stbSplSplitInterval(pCxt, pInfo); case WINDOW_TYPE_SESSION: return stbSplSplitSession(pCxt, pInfo); + case WINDOW_TYPE_STATE: + return stbSplSplitState(pCxt, pInfo); default: break; } diff --git a/source/libs/planner/test/planSessionTest.cpp b/source/libs/planner/test/planSessionTest.cpp index 7d5d826925..f445bb5ffc 100644 --- a/source/libs/planner/test/planSessionTest.cpp +++ b/source/libs/planner/test/planSessionTest.cpp @@ -34,3 +34,13 @@ TEST_F(PlanSessionTest, selectFunc) { // select function along with the columns of select row, and with SESSION clause run("SELECT MAX(c1), c2 FROM t1 SESSION(ts, 10s)"); } + +TEST_F(PlanSessionTest, stable) { + useDb("root", "test"); + + // select function for SESSION clause + run("SELECT MAX(c1), MIN(c1) FROM st1 SESSION(ts, 10s)"); + // select function along with the columns of select row, and with SESSION clause + run("SELECT MAX(c1), c2 FROM st1 SESSION(ts, 10s)"); + run("SELECT count(ts) FROM st1 PARTITION BY c1 SESSION(ts, 10s)"); +} diff --git a/source/libs/planner/test/planStateTest.cpp b/source/libs/planner/test/planStateTest.cpp index 9ff035e148..6985bc8807 100644 --- a/source/libs/planner/test/planStateTest.cpp +++ b/source/libs/planner/test/planStateTest.cpp @@ -40,3 +40,12 @@ TEST_F(PlanStateTest, selectFunc) { // select function along with the columns of select row, and with STATE_WINDOW clause run("SELECT MAX(c1), c2 FROM t1 STATE_WINDOW(c3)"); } + +TEST_F(PlanStateTest, stable) { + useDb("root", "test"); + + // select function for STATE_WINDOW clause + run("SELECT MAX(c1), MIN(c1) FROM st1 STATE_WINDOW(c2)"); + // select function along with the columns of select row, and with STATE_WINDOW clause + run("SELECT MAX(c1), c2 FROM st1 STATE_WINDOW(c2)"); +} -- GitLab