diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index fc630cfdc02bd40e5b01723be44cceb7e31cd74c..a09c87eea8f90c6b8c4dc6017ffee13c0bdcaab9 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -267,76 +267,78 @@ #define TK_BY 249 #define TK_SESSION 250 #define TK_STATE_WINDOW 251 -#define TK_SLIDING 252 -#define TK_FILL 253 -#define TK_VALUE 254 -#define TK_NONE 255 -#define TK_PREV 256 -#define TK_LINEAR 257 -#define TK_NEXT 258 -#define TK_HAVING 259 -#define TK_RANGE 260 -#define TK_EVERY 261 -#define TK_ORDER 262 -#define TK_SLIMIT 263 -#define TK_SOFFSET 264 -#define TK_LIMIT 265 -#define TK_OFFSET 266 -#define TK_ASC 267 -#define TK_NULLS 268 -#define TK_ABORT 269 -#define TK_AFTER 270 -#define TK_ATTACH 271 -#define TK_BEFORE 272 -#define TK_BEGIN 273 -#define TK_BITAND 274 -#define TK_BITNOT 275 -#define TK_BITOR 276 -#define TK_BLOCKS 277 -#define TK_CHANGE 278 -#define TK_COMMA 279 -#define TK_COMPACT 280 -#define TK_CONCAT 281 -#define TK_CONFLICT 282 -#define TK_COPY 283 -#define TK_DEFERRED 284 -#define TK_DELIMITERS 285 -#define TK_DETACH 286 -#define TK_DIVIDE 287 -#define TK_DOT 288 -#define TK_EACH 289 -#define TK_FAIL 290 -#define TK_FILE 291 -#define TK_FOR 292 -#define TK_GLOB 293 -#define TK_ID 294 -#define TK_IMMEDIATE 295 -#define TK_IMPORT 296 -#define TK_INITIALLY 297 -#define TK_INSTEAD 298 -#define TK_ISNULL 299 -#define TK_KEY 300 -#define TK_MODULES 301 -#define TK_NK_BITNOT 302 -#define TK_NK_SEMI 303 -#define TK_NOTNULL 304 -#define TK_OF 305 -#define TK_PLUS 306 -#define TK_PRIVILEGE 307 -#define TK_RAISE 308 -#define TK_REPLACE 309 -#define TK_RESTRICT 310 -#define TK_ROW 311 -#define TK_SEMI 312 -#define TK_STAR 313 -#define TK_STATEMENT 314 -#define TK_STRING 315 -#define TK_TIMES 316 -#define TK_UPDATE 317 -#define TK_VALUES 318 -#define TK_VARIABLE 319 -#define TK_VIEW 320 -#define TK_WAL 321 +#define TK_EVENT_WINDOW 252 +#define TK_START 253 +#define TK_SLIDING 254 +#define TK_FILL 255 +#define TK_VALUE 256 +#define TK_NONE 257 +#define TK_PREV 258 +#define TK_LINEAR 259 +#define TK_NEXT 260 +#define TK_HAVING 261 +#define TK_RANGE 262 +#define TK_EVERY 263 +#define TK_ORDER 264 +#define TK_SLIMIT 265 +#define TK_SOFFSET 266 +#define TK_LIMIT 267 +#define TK_OFFSET 268 +#define TK_ASC 269 +#define TK_NULLS 270 +#define TK_ABORT 271 +#define TK_AFTER 272 +#define TK_ATTACH 273 +#define TK_BEFORE 274 +#define TK_BEGIN 275 +#define TK_BITAND 276 +#define TK_BITNOT 277 +#define TK_BITOR 278 +#define TK_BLOCKS 279 +#define TK_CHANGE 280 +#define TK_COMMA 281 +#define TK_COMPACT 282 +#define TK_CONCAT 283 +#define TK_CONFLICT 284 +#define TK_COPY 285 +#define TK_DEFERRED 286 +#define TK_DELIMITERS 287 +#define TK_DETACH 288 +#define TK_DIVIDE 289 +#define TK_DOT 290 +#define TK_EACH 291 +#define TK_FAIL 292 +#define TK_FILE 293 +#define TK_FOR 294 +#define TK_GLOB 295 +#define TK_ID 296 +#define TK_IMMEDIATE 297 +#define TK_IMPORT 298 +#define TK_INITIALLY 299 +#define TK_INSTEAD 300 +#define TK_ISNULL 301 +#define TK_KEY 302 +#define TK_MODULES 303 +#define TK_NK_BITNOT 304 +#define TK_NK_SEMI 305 +#define TK_NOTNULL 306 +#define TK_OF 307 +#define TK_PLUS 308 +#define TK_PRIVILEGE 309 +#define TK_RAISE 310 +#define TK_REPLACE 311 +#define TK_RESTRICT 312 +#define TK_ROW 313 +#define TK_SEMI 314 +#define TK_STAR 315 +#define TK_STATEMENT 316 +#define TK_STRING 317 +#define TK_TIMES 318 +#define TK_UPDATE 319 +#define TK_VALUES 320 +#define TK_VARIABLE 321 +#define TK_VIEW 322 +#define TK_WAL 323 #define TK_NK_SPACE 600 #define TK_NK_COMMENT 601 diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 0a2d76d0972f13bb37b622fa86fe05bbd901d57a..6f700c75edf82a9eddb23ef82f0dade1c89680b2 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -511,6 +511,8 @@ typedef struct SEventWinodwPhysiNode { SNode* pEndCond; } SEventWinodwPhysiNode; +typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode; + typedef struct SSortPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index cfbab6c65220cf5ad0139a4731dfcf265417264b..aab018c87968b203ee79da2e4d872bd444d21082 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -469,6 +469,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p CLONE_NODE_FIELD(pTspk); CLONE_NODE_FIELD(pTsEnd); CLONE_NODE_FIELD(pStateExpr); + CLONE_NODE_FIELD(pStartCond); + CLONE_NODE_FIELD(pEndCond); COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(deleteMark); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0f9b3247e1bf9962be7fa9d98bec87748cd76756..38884a37e05400adb34e073ec8760ea447e9fdb1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -235,6 +235,10 @@ const char* nodesNodeName(ENodeType type) { return "PhysiLastRowScan"; case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: return "PhysiTableCountScan"; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + return "PhysiMergeEventWindow"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + return "PhysiStreamEventWindow"; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: @@ -2274,6 +2278,37 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkEventWindowPhysiPlanStartCond = "StartCond"; +static const char* jkEventWindowPhysiPlanEndCond = "EndCond"; + +static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) { + const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj; + + int32_t code = physiWindowNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkEventWindowPhysiPlanStartCond, nodeToJson, pNode->pStartCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond); + } + + return code; +} + +static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) { + SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj; + + int32_t code = jsonToPhysiWindowNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanStartCond, &pNode->pStartCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond); + } + + return code; +} + static const char* jkPartitionPhysiPlanExprs = "Exprs"; static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys"; static const char* jkPartitionPhysiPlanTargets = "Targets"; @@ -4746,6 +4781,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return physiStateWindowNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + return physiEventWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return physiPartitionNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: @@ -4907,6 +4945,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return jsonToPhysiStateWindowNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + return jsonToPhysiEventWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return jsonToPhysiPartitionNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index cc9cb31d181404f2c4dfb0807d22b117a24c5b82..cb441053cef64b6b79fd5d56c65dd01aaedd763f 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2927,6 +2927,46 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND }; + +static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_START_COND, nodeToMsg, pNode->pStartCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_END_COND, nodeToMsg, pNode->pEndCond); + } + + return code; +} + +static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) { + SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_EVENT_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_EVENT_CODE_START_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartCond); + break; + case PHY_EVENT_CODE_END_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndCond); + break; + default: + break; + } + } + + return code; +} + enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS }; static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3698,6 +3738,10 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: code = physiStateWindowNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + code = physiEventWindowNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = physiPartitionNodeToMsg(pObj, pEncoder); break; @@ -3837,6 +3881,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: code = msgToPhysiStateWindowNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + code = msgToPhysiEventWindowNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = msgToPhysiPartitionNode(pDecoder, pObj); break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 2f043751916d6042356d9a64d960954c06fbddec..7980c58dcf8183482bb0c9a8d518dfaeffd5f3b8 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -537,6 +537,10 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SStateWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return makeNode(type, sizeof(SStreamStateWinodwPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + return makeNode(type, sizeof(SEventWinodwPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + return makeNode(type, sizeof(SStreamEventWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return makeNode(type, sizeof(SPartitionPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: @@ -1241,6 +1245,14 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pStateKey); break; } + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: { + SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode; + destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pStartCond); + nodesDestroyNode(pPhyNode->pEndCond); + break; + } case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode); break; diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index e62b2f0f5ab2db843d41009bb21e68a60f19cd67..1f9e4e9ab1b58478d5e57b7e3aaff2e99d884c99 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -90,6 +90,7 @@ static SKeyword keywordTable[] = { {"EXISTS", TK_EXISTS}, {"EXPIRED", TK_EXPIRED}, {"EXPLAIN", TK_EXPLAIN}, + {"EVENT_WINDOW", TK_EVENT_WINDOW}, {"EVERY", TK_EVERY}, {"FILE", TK_FILE}, {"FILL", TK_FILL}, @@ -195,15 +196,16 @@ static SKeyword keywordTable[] = { {"SNODES", TK_SNODES}, {"SOFFSET", TK_SOFFSET}, {"SPLIT", TK_SPLIT}, - {"STT_TRIGGER", TK_STT_TRIGGER}, {"STABLE", TK_STABLE}, {"STABLES", TK_STABLES}, + {"START", TK_START}, {"STATE", TK_STATE}, {"STATE_WINDOW", TK_STATE_WINDOW}, {"STORAGE", TK_STORAGE}, {"STREAM", TK_STREAM}, {"STREAMS", TK_STREAMS}, {"STRICT", TK_STRICT}, + {"STT_TRIGGER", TK_STT_TRIGGER}, {"SUBSCRIBE", TK_SUBSCRIBE}, {"SUBSCRIPTIONS", TK_SUBSCRIPTIONS}, {"SUBTABLE", TK_SUBTABLE}, diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index e8b18d4016a691171182acca0d413fa70cf3e00e..205c70e0dffdcb77b333f310da93554a782a251b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -830,17 +830,11 @@ static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindo pWindow->pStartCond = nodesCloneNode(pEvent->pStartCond); pWindow->pEndCond = nodesCloneNode(pEvent->pEndCond); pWindow->pTspk = nodesCloneNode(pEvent->pCol); - if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) { + if (NULL == pWindow->pStartCond || NULL == pWindow->pEndCond || NULL == pWindow->pTspk) { nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } - // rewrite the expression in subsequent clauses - int32_t code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW); - if (TSDB_CODE_SUCCESS == code) { - code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); - } - - return code; + return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index a13e959a369ff03141e7c1a612529defdb1b5321..72931413ccfccd05cb657be8555cc510832d8fb2 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -197,6 +197,15 @@ static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderL return TSDB_CODE_SUCCESS; } +static int32_t adjustEventDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { switch (pWindow->winType) { case WINDOW_TYPE_INTERVAL: @@ -205,6 +214,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder return adjustSessionDataRequirement(pWindow, requirement); case WINDOW_TYPE_STATE: return adjustStateDataRequirement(pWindow, requirement); + case WINDOW_TYPE_EVENT: + return adjustEventDataRequirement(pWindow, requirement); default: break; } diff --git a/source/libs/planner/test/planEventTest.cpp b/source/libs/planner/test/planEventTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c4db1459982de3c8daf6b8b37795a806ddd77d01 --- /dev/null +++ b/source/libs/planner/test/planEventTest.cpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planTestUtil.h" +#include "planner.h" + +using namespace std; + +class PlanEventTest : public PlannerTestBase {}; + +TEST_F(PlanEventTest, basic) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM t1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'"); +} + +TEST_F(PlanEventTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'"); +}