未验证 提交 57a8ad3b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18925 from taosdata/enh/3.0_planner_optimize

feat: event window query
......@@ -267,76 +267,78 @@
#define TK_BY 249
#define TK_SESSION 250
#define TK_STATE_WINDOW 251
#define TK_SLIDING 252
#define TK_FILL 253
#define TK_VALUE 254
#define TK_NONE 255
#define TK_PREV 256
#define TK_LINEAR 257
#define TK_NEXT 258
#define TK_HAVING 259
#define TK_RANGE 260
#define TK_EVERY 261
#define TK_ORDER 262
#define TK_SLIMIT 263
#define TK_SOFFSET 264
#define TK_LIMIT 265
#define TK_OFFSET 266
#define TK_ASC 267
#define TK_NULLS 268
#define TK_ABORT 269
#define TK_AFTER 270
#define TK_ATTACH 271
#define TK_BEFORE 272
#define TK_BEGIN 273
#define TK_BITAND 274
#define TK_BITNOT 275
#define TK_BITOR 276
#define TK_BLOCKS 277
#define TK_CHANGE 278
#define TK_COMMA 279
#define TK_COMPACT 280
#define TK_CONCAT 281
#define TK_CONFLICT 282
#define TK_COPY 283
#define TK_DEFERRED 284
#define TK_DELIMITERS 285
#define TK_DETACH 286
#define TK_DIVIDE 287
#define TK_DOT 288
#define TK_EACH 289
#define TK_FAIL 290
#define TK_FILE 291
#define TK_FOR 292
#define TK_GLOB 293
#define TK_ID 294
#define TK_IMMEDIATE 295
#define TK_IMPORT 296
#define TK_INITIALLY 297
#define TK_INSTEAD 298
#define TK_ISNULL 299
#define TK_KEY 300
#define TK_MODULES 301
#define TK_NK_BITNOT 302
#define TK_NK_SEMI 303
#define TK_NOTNULL 304
#define TK_OF 305
#define TK_PLUS 306
#define TK_PRIVILEGE 307
#define TK_RAISE 308
#define TK_REPLACE 309
#define TK_RESTRICT 310
#define TK_ROW 311
#define TK_SEMI 312
#define TK_STAR 313
#define TK_STATEMENT 314
#define TK_STRING 315
#define TK_TIMES 316
#define TK_UPDATE 317
#define TK_VALUES 318
#define TK_VARIABLE 319
#define TK_VIEW 320
#define TK_WAL 321
#define TK_EVENT_WINDOW 252
#define TK_START 253
#define TK_SLIDING 254
#define TK_FILL 255
#define TK_VALUE 256
#define TK_NONE 257
#define TK_PREV 258
#define TK_LINEAR 259
#define TK_NEXT 260
#define TK_HAVING 261
#define TK_RANGE 262
#define TK_EVERY 263
#define TK_ORDER 264
#define TK_SLIMIT 265
#define TK_SOFFSET 266
#define TK_LIMIT 267
#define TK_OFFSET 268
#define TK_ASC 269
#define TK_NULLS 270
#define TK_ABORT 271
#define TK_AFTER 272
#define TK_ATTACH 273
#define TK_BEFORE 274
#define TK_BEGIN 275
#define TK_BITAND 276
#define TK_BITNOT 277
#define TK_BITOR 278
#define TK_BLOCKS 279
#define TK_CHANGE 280
#define TK_COMMA 281
#define TK_COMPACT 282
#define TK_CONCAT 283
#define TK_CONFLICT 284
#define TK_COPY 285
#define TK_DEFERRED 286
#define TK_DELIMITERS 287
#define TK_DETACH 288
#define TK_DIVIDE 289
#define TK_DOT 290
#define TK_EACH 291
#define TK_FAIL 292
#define TK_FILE 293
#define TK_FOR 294
#define TK_GLOB 295
#define TK_ID 296
#define TK_IMMEDIATE 297
#define TK_IMPORT 298
#define TK_INITIALLY 299
#define TK_INSTEAD 300
#define TK_ISNULL 301
#define TK_KEY 302
#define TK_MODULES 303
#define TK_NK_BITNOT 304
#define TK_NK_SEMI 305
#define TK_NOTNULL 306
#define TK_OF 307
#define TK_PLUS 308
#define TK_PRIVILEGE 309
#define TK_RAISE 310
#define TK_REPLACE 311
#define TK_RESTRICT 312
#define TK_ROW 313
#define TK_SEMI 314
#define TK_STAR 315
#define TK_STATEMENT 316
#define TK_STRING 317
#define TK_TIMES 318
#define TK_UPDATE 319
#define TK_VALUES 320
#define TK_VARIABLE 321
#define TK_VIEW 322
#define TK_WAL 323
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
......
......@@ -112,6 +112,7 @@ typedef enum ENodeType {
QUERY_NODE_COLUMN_REF,
QUERY_NODE_WHEN_THEN,
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
......@@ -265,7 +266,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT
} ENodeType;
/**
......
......@@ -185,7 +185,12 @@ typedef struct SMergeLogicNode {
bool groupSort;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE,
WINDOW_TYPE_EVENT
} EWindowType;
typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1,
......@@ -212,6 +217,8 @@ typedef struct SWindowLogicNode {
SNode* pTspk;
SNode* pTsEnd;
SNode* pStateExpr;
SNode* pStartCond;
SNode* pEndCond;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
......@@ -498,6 +505,14 @@ typedef struct SStateWinodwPhysiNode {
typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;
typedef struct SEventWinodwPhysiNode {
SWinodwPhysiNode window;
SNode* pStartCond;
SNode* pEndCond;
} SEventWinodwPhysiNode;
typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
......
......@@ -223,6 +223,13 @@ typedef struct SIntervalWindowNode {
SNode* pFill;
} SIntervalWindowNode;
typedef struct SEventWindowNode {
ENodeType type; // QUERY_NODE_EVENT_WINDOW
SNode* pCol; // timestamp primary key
SNode* pStartCond;
SNode* pEndCond;
} SEventWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
......
......@@ -295,6 +295,13 @@ static int32_t stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNod
return TSDB_CODE_SUCCESS;
}
static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
return TSDB_CODE_SUCCESS;
}
static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
......@@ -462,6 +469,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
CLONE_NODE_FIELD(pStateExpr);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
......@@ -709,6 +718,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_STATE_WINDOW:
code = stateWindowNodeCopy((const SStateWindowNode*)pNode, (SStateWindowNode*)pDst);
break;
case QUERY_NODE_EVENT_WINDOW:
code = eventWindowNodeCopy((const SEventWindowNode*)pNode, (SEventWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
......
......@@ -85,6 +85,8 @@ const char* nodesNodeName(ENodeType type) {
return "WhenThen";
case QUERY_NODE_CASE_WHEN:
return "CaseWhen";
case QUERY_NODE_EVENT_WINDOW:
return "EventWindow";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
......@@ -233,6 +235,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiLastRowScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return "PhysiTableCountScan";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return "PhysiMergeEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "PhysiStreamEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
......@@ -2272,6 +2278,37 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkEventWindowPhysiPlanStartCond = "StartCond";
static const char* jkEventWindowPhysiPlanEndCond = "EndCond";
static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond);
}
return code;
}
static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond);
}
return code;
}
static const char* jkPartitionPhysiPlanExprs = "Exprs";
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
static const char* jkPartitionPhysiPlanTargets = "Targets";
......@@ -3660,6 +3697,36 @@ static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkEventWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkEventWindowStartCond = "StartCond";
static const char* jkEventWindowEndCond = "EndCond";
static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWindowNode* pNode = (const SEventWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkEventWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowEndCond, nodeToJson, pNode->pEndCond);
}
return code;
}
static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) {
SEventWindowNode* pNode = (SEventWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkEventWindowTsPrimaryKey, &pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowEndCond, &pNode->pEndCond);
}
return code;
}
static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
......@@ -4615,6 +4682,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return whenThenNodeToJson(pObj, pJson);
case QUERY_NODE_CASE_WHEN:
return caseWhenNodeToJson(pObj, pJson);
case QUERY_NODE_EVENT_WINDOW:
return eventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
......@@ -4712,6 +4781,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return physiEventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......@@ -4787,6 +4859,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToWhenThenNode(pJson, pObj);
case QUERY_NODE_CASE_WHEN:
return jsonToCaseWhenNode(pJson, pObj);
case QUERY_NODE_EVENT_WINDOW:
return jsonToEventWindowNode(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
......@@ -4871,6 +4945,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return jsonToPhysiEventWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......
......@@ -2927,6 +2927,46 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND };
static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_START_COND, nodeToMsg, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_END_COND, nodeToMsg, pNode->pEndCond);
}
return code;
}
static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_EVENT_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break;
case PHY_EVENT_CODE_START_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartCond);
break;
case PHY_EVENT_CODE_END_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndCond);
break;
default:
break;
}
}
return code;
}
enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS };
static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -3698,6 +3738,10 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
code = physiStateWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
code = physiEventWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = physiPartitionNodeToMsg(pObj, pEncoder);
break;
......@@ -3837,6 +3881,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
code = msgToPhysiStateWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
code = msgToPhysiEventWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = msgToPhysiPartitionNode(pDecoder, pObj);
break;
......
......@@ -165,6 +165,17 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
}
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
res = walkExpr(pEvent->pCol, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pStartCond, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pEndCond, order, walker, pContext);
}
break;
}
default:
break;
}
......@@ -329,6 +340,17 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
}
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
res = rewriteExpr(&pEvent->pCol, order, rewriter, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pStartCond, order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pEndCond, order, rewriter, pContext);
}
break;
}
default:
break;
}
......
......@@ -299,6 +299,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SWhenThenNode));
case QUERY_NODE_CASE_WHEN:
return makeNode(type, sizeof(SCaseWhenNode));
case QUERY_NODE_EVENT_WINDOW:
return makeNode(type, sizeof(SEventWindowNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
......@@ -535,6 +537,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return makeNode(type, sizeof(SEventWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return makeNode(type, sizeof(SStreamEventWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......@@ -765,16 +771,23 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_COLUMN_REF: // no pointer field
break;
case QUERY_NODE_WHEN_THEN: {
SWhenThenNode* pStmt = (SWhenThenNode*)pNode;
nodesDestroyNode(pStmt->pWhen);
nodesDestroyNode(pStmt->pThen);
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
nodesDestroyNode(pWhenThen->pWhen);
nodesDestroyNode(pWhenThen->pThen);
break;
}
case QUERY_NODE_CASE_WHEN: {
SCaseWhenNode* pStmt = (SCaseWhenNode*)pNode;
nodesDestroyNode(pStmt->pCase);
nodesDestroyNode(pStmt->pElse);
nodesDestroyList(pStmt->pWhenThenList);
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode;
nodesDestroyNode(pCaseWhen->pCase);
nodesDestroyNode(pCaseWhen->pElse);
nodesDestroyList(pCaseWhen->pWhenThenList);
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
nodesDestroyNode(pEvent->pCol);
nodesDestroyNode(pEvent->pStartCond);
nodesDestroyNode(pEvent->pEndCond);
break;
}
case QUERY_NODE_SET_OPERATOR: {
......@@ -1232,6 +1245,14 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pStateKey);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pStartCond);
nodesDestroyNode(pPhyNode->pEndCond);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode);
break;
......
......@@ -116,6 +116,7 @@ SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const STok
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
......
......@@ -964,6 +964,8 @@ twindow_clause_opt(A) ::=
twindow_clause_opt(A) ::=
INTERVAL NK_LP duration_literal(B) NK_COMMA duration_literal(C) NK_RP
sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), D, E); }
twindow_clause_opt(A) ::=
EVENT_WINDOW START WITH search_condition(B) END WITH search_condition(C). { A = createEventWindowNode(pCxt, B, C); }
sliding_opt(A) ::= . { A = NULL; }
sliding_opt(A) ::= SLIDING NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
......
......@@ -605,6 +605,20 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
return (SNode*)state;
}
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond) {
CHECK_PARSER_STATUS(pCxt);
SEventWindowNode* pEvent = (SEventWindowNode*)nodesMakeNode(QUERY_NODE_EVENT_WINDOW);
CHECK_OUT_OF_MEM(pEvent);
pEvent->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == pEvent->pCol) {
nodesDestroyNode((SNode*)pEvent);
CHECK_OUT_OF_MEM(NULL);
}
pEvent->pStartCond = pStartCond;
pEvent->pEndCond = pEndCond;
return (SNode*)pEvent;
}
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill) {
CHECK_PARSER_STATUS(pCxt);
......
......@@ -90,6 +90,7 @@ static SKeyword keywordTable[] = {
{"EXISTS", TK_EXISTS},
{"EXPIRED", TK_EXPIRED},
{"EXPLAIN", TK_EXPLAIN},
{"EVENT_WINDOW", TK_EVENT_WINDOW},
{"EVERY", TK_EVERY},
{"FILE", TK_FILE},
{"FILL", TK_FILL},
......@@ -195,15 +196,16 @@ static SKeyword keywordTable[] = {
{"SNODES", TK_SNODES},
{"SOFFSET", TK_SOFFSET},
{"SPLIT", TK_SPLIT},
{"STT_TRIGGER", TK_STT_TRIGGER},
{"STABLE", TK_STABLE},
{"STABLES", TK_STABLES},
{"START", TK_START},
{"STATE", TK_STATE},
{"STATE_WINDOW", TK_STATE_WINDOW},
{"STORAGE", TK_STORAGE},
{"STREAM", TK_STREAM},
{"STREAMS", TK_STREAMS},
{"STRICT", TK_STRICT},
{"STT_TRIGGER", TK_STT_TRIGGER},
{"SUBSCRIBE", TK_SUBSCRIBE},
{"SUBSCRIPTIONS", TK_SUBSCRIPTIONS},
{"SUBTABLE", TK_SUBTABLE},
......
......@@ -3143,6 +3143,15 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSelectStmt* pSel
return TSDB_CODE_SUCCESS;
}
static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"EVENT_WINDOW requires valid time series input");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
......@@ -3151,6 +3160,8 @@ static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSe
return translateSessionWindow(pCxt, pSelect);
case QUERY_NODE_INTERVAL_WINDOW:
return translateIntervalWindow(pCxt, pSelect);
case QUERY_NODE_EVENT_WINDOW:
return translateEventWindow(pCxt, pSelect);
default:
break;
}
......
此差异已折叠。
......@@ -814,6 +814,29 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindowNode* pEvent, SSelectStmt* pSelect,
SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWindow->winType = WINDOW_TYPE_EVENT;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect);
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder;
pWindow->pStartCond = nodesCloneNode(pEvent->pStartCond);
pWindow->pEndCond = nodesCloneNode(pEvent->pEndCond);
pWindow->pTspk = nodesCloneNode(pEvent->pCol);
if (NULL == pWindow->pStartCond || NULL == pWindow->pEndCond || NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
......@@ -826,6 +849,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return createWindowLogicNodeBySession(pCxt, (SSessionWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_EVENT_WINDOW:
return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
default:
break;
}
......
......@@ -1297,6 +1297,33 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return code;
}
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT : QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT));
if (NULL == pEvent) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pEvent;
} else {
nodesDestroyNode((SNode*)pEvent);
}
return code;
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
SPhysiNode** pPhyNode) {
switch (pWindowLogicNode->winType) {
......@@ -1306,6 +1333,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_STATE:
return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_EVENT:
return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
default:
break;
}
......
......@@ -729,6 +729,18 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
}
}
static int32_t stbSplSplitEventForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitEventForStream(pCxt, pInfo);
} else {
return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
}
}
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
......@@ -741,6 +753,8 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
return stbSplSplitSession(pCxt, pInfo);
case WINDOW_TYPE_STATE:
return stbSplSplitState(pCxt, pInfo);
case WINDOW_TYPE_EVENT:
return stbSplSplitEvent(pCxt, pInfo);
default:
break;
}
......
......@@ -197,6 +197,15 @@ static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderL
return TSDB_CODE_SUCCESS;
}
static int32_t adjustEventDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
......@@ -205,6 +214,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder
return adjustSessionDataRequirement(pWindow, requirement);
case WINDOW_TYPE_STATE:
return adjustStateDataRequirement(pWindow, requirement);
case WINDOW_TYPE_EVENT:
return adjustEventDataRequirement(pWindow, requirement);
default:
break;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planTestUtil.h"
#include "planner.h"
using namespace std;
class PlanEventTest : public PlannerTestBase {};
TEST_F(PlanEventTest, basic) {
useDb("root", "test");
run("SELECT COUNT(*) FROM t1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'");
}
TEST_F(PlanEventTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'");
}
......@@ -552,7 +552,7 @@ sql use test4;
sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ;
sql create table t1 using st tags (-81) ;
sql create table t2 using st tags (-81) ;
sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1);
sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
sql insert into t1 (ts, c1) values (1668073288209, 11);
sql insert into t1 (ts, c1) values (1668073288210, 11);
......@@ -567,7 +567,7 @@ loop7:
sleep 200
sql select * from streamt4 order by start;
sql select * from streamt4 order by startts;
$loop_count = $loop_count + 1
if $loop_count == 20 then
......@@ -606,7 +606,7 @@ loop8:
sleep 200
sql select * from streamt4 order by start;
sql select * from streamt4 order by startts;
$loop_count = $loop_count + 1
if $loop_count == 20 then
......@@ -640,7 +640,7 @@ loop8:
sleep 200
sql select * from streamt4 order by start;
sql select * from streamt4 order by startts;
$loop_count = $loop_count + 1
if $loop_count == 20 then
......@@ -679,7 +679,7 @@ loop9:
sleep 200
sql select * from streamt4 order by start;
sql select * from streamt4 order by startts;
$loop_count = $loop_count + 1
if $loop_count == 20 then
......
......@@ -136,7 +136,7 @@ class TDTestCase:
tdSql.query("use source_db")
tdSql.query("create table if not exists source_db.stb (ts timestamp, k int) tags (a int);")
tdSql.query("create table source_db.ct1 using source_db.stb tags(1000);create table source_db.ct2 using source_db.stb tags(2000);create table source_db.ct3 using source_db.stb tags(3000);")
tdSql.query("create stream s1 into source_db.output_stb as select _wstart AS start, min(k), max(k), sum(k) from source_db.stb interval(10m);")
tdSql.query("create stream s1 into source_db.output_stb as select _wstart AS startts, min(k), max(k), sum(k) from source_db.stb interval(10m);")
#TD-19944 -Q=3
......
......@@ -108,7 +108,7 @@ class TDTestCase:
# create stream
tdSql.execute('''create stream current_stream into stream_max_stable_1 as select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''')
tdSql.execute('''create stream current_stream into stream_max_stable_1 as select _wstart as startts, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''')
# insert data
for i in range(num_random*n):
......@@ -187,20 +187,20 @@ class TDTestCase:
sleep(5)
# stream data check
tdSql.query("select start,wend,max_int from stream_max_stable_1 ;")
tdSql.query("select startts,wend,max_int from stream_max_stable_1 ;")
tdSql.checkRows(20)
tdSql.query("select sum(max_int) from stream_max_stable_1 ;")
stream_data_1 = tdSql.queryResult[0][0]
tdSql.query("select sum(min_int) from stream_max_stable_1 ;")
stream_data_2 = tdSql.queryResult[0][0]
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s));")
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as startts, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s));")
sql_data_1 = tdSql.queryResult[0][0]
sql_data_2 = tdSql.queryResult[0][1]
self.stream_value_check(stream_data_1,sql_data_1)
self.stream_value_check(stream_data_2,sql_data_2)
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as start, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 interval (5s));")
tdSql.query("select sum(max_int),sum(min_int) from (select _wstart as startts, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 interval (5s));")
sql_data_1 = tdSql.queryResult[0][0]
sql_data_2 = tdSql.queryResult[0][1]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册