提交 2f9753c5 编写于 作者: X Xiaoyu Wang

event window query

上级 9d6d05c4
......@@ -267,76 +267,78 @@
#define TK_BY 249
#define TK_SESSION 250
#define TK_STATE_WINDOW 251
#define TK_SLIDING 252
#define TK_FILL 253
#define TK_VALUE 254
#define TK_NONE 255
#define TK_PREV 256
#define TK_LINEAR 257
#define TK_NEXT 258
#define TK_HAVING 259
#define TK_RANGE 260
#define TK_EVERY 261
#define TK_ORDER 262
#define TK_SLIMIT 263
#define TK_SOFFSET 264
#define TK_LIMIT 265
#define TK_OFFSET 266
#define TK_ASC 267
#define TK_NULLS 268
#define TK_ABORT 269
#define TK_AFTER 270
#define TK_ATTACH 271
#define TK_BEFORE 272
#define TK_BEGIN 273
#define TK_BITAND 274
#define TK_BITNOT 275
#define TK_BITOR 276
#define TK_BLOCKS 277
#define TK_CHANGE 278
#define TK_COMMA 279
#define TK_COMPACT 280
#define TK_CONCAT 281
#define TK_CONFLICT 282
#define TK_COPY 283
#define TK_DEFERRED 284
#define TK_DELIMITERS 285
#define TK_DETACH 286
#define TK_DIVIDE 287
#define TK_DOT 288
#define TK_EACH 289
#define TK_FAIL 290
#define TK_FILE 291
#define TK_FOR 292
#define TK_GLOB 293
#define TK_ID 294
#define TK_IMMEDIATE 295
#define TK_IMPORT 296
#define TK_INITIALLY 297
#define TK_INSTEAD 298
#define TK_ISNULL 299
#define TK_KEY 300
#define TK_MODULES 301
#define TK_NK_BITNOT 302
#define TK_NK_SEMI 303
#define TK_NOTNULL 304
#define TK_OF 305
#define TK_PLUS 306
#define TK_PRIVILEGE 307
#define TK_RAISE 308
#define TK_REPLACE 309
#define TK_RESTRICT 310
#define TK_ROW 311
#define TK_SEMI 312
#define TK_STAR 313
#define TK_STATEMENT 314
#define TK_STRING 315
#define TK_TIMES 316
#define TK_UPDATE 317
#define TK_VALUES 318
#define TK_VARIABLE 319
#define TK_VIEW 320
#define TK_WAL 321
#define TK_EVENT_WINDOW 252
#define TK_START 253
#define TK_SLIDING 254
#define TK_FILL 255
#define TK_VALUE 256
#define TK_NONE 257
#define TK_PREV 258
#define TK_LINEAR 259
#define TK_NEXT 260
#define TK_HAVING 261
#define TK_RANGE 262
#define TK_EVERY 263
#define TK_ORDER 264
#define TK_SLIMIT 265
#define TK_SOFFSET 266
#define TK_LIMIT 267
#define TK_OFFSET 268
#define TK_ASC 269
#define TK_NULLS 270
#define TK_ABORT 271
#define TK_AFTER 272
#define TK_ATTACH 273
#define TK_BEFORE 274
#define TK_BEGIN 275
#define TK_BITAND 276
#define TK_BITNOT 277
#define TK_BITOR 278
#define TK_BLOCKS 279
#define TK_CHANGE 280
#define TK_COMMA 281
#define TK_COMPACT 282
#define TK_CONCAT 283
#define TK_CONFLICT 284
#define TK_COPY 285
#define TK_DEFERRED 286
#define TK_DELIMITERS 287
#define TK_DETACH 288
#define TK_DIVIDE 289
#define TK_DOT 290
#define TK_EACH 291
#define TK_FAIL 292
#define TK_FILE 293
#define TK_FOR 294
#define TK_GLOB 295
#define TK_ID 296
#define TK_IMMEDIATE 297
#define TK_IMPORT 298
#define TK_INITIALLY 299
#define TK_INSTEAD 300
#define TK_ISNULL 301
#define TK_KEY 302
#define TK_MODULES 303
#define TK_NK_BITNOT 304
#define TK_NK_SEMI 305
#define TK_NOTNULL 306
#define TK_OF 307
#define TK_PLUS 308
#define TK_PRIVILEGE 309
#define TK_RAISE 310
#define TK_REPLACE 311
#define TK_RESTRICT 312
#define TK_ROW 313
#define TK_SEMI 314
#define TK_STAR 315
#define TK_STATEMENT 316
#define TK_STRING 317
#define TK_TIMES 318
#define TK_UPDATE 319
#define TK_VALUES 320
#define TK_VARIABLE 321
#define TK_VIEW 322
#define TK_WAL 323
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
......
......@@ -511,6 +511,8 @@ typedef struct SEventWinodwPhysiNode {
SNode* pEndCond;
} SEventWinodwPhysiNode;
typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
......
......@@ -469,6 +469,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pTspk);
CLONE_NODE_FIELD(pTsEnd);
CLONE_NODE_FIELD(pStateExpr);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
......
......@@ -235,6 +235,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiLastRowScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return "PhysiTableCountScan";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return "PhysiMergeEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "PhysiStreamEventWindow";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
......@@ -2274,6 +2278,37 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkEventWindowPhysiPlanStartCond = "StartCond";
static const char* jkEventWindowPhysiPlanEndCond = "EndCond";
static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond);
}
return code;
}
static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond);
}
return code;
}
static const char* jkPartitionPhysiPlanExprs = "Exprs";
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
static const char* jkPartitionPhysiPlanTargets = "Targets";
......@@ -4746,6 +4781,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return physiEventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......@@ -4907,6 +4945,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return jsonToPhysiEventWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......
......@@ -2927,6 +2927,46 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND };
static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_START_COND, nodeToMsg, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_END_COND, nodeToMsg, pNode->pEndCond);
}
return code;
}
static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
SEventWinodwPhysiNode* pNode = (SEventWinodwPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_EVENT_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break;
case PHY_EVENT_CODE_START_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartCond);
break;
case PHY_EVENT_CODE_END_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndCond);
break;
default:
break;
}
}
return code;
}
enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS };
static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -3698,6 +3738,10 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
code = physiStateWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
code = physiEventWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = physiPartitionNodeToMsg(pObj, pEncoder);
break;
......@@ -3837,6 +3881,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
code = msgToPhysiStateWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
code = msgToPhysiEventWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = msgToPhysiPartitionNode(pDecoder, pObj);
break;
......
......@@ -537,6 +537,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return makeNode(type, sizeof(SEventWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return makeNode(type, sizeof(SStreamEventWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
......@@ -1241,6 +1245,14 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pStateKey);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pStartCond);
nodesDestroyNode(pPhyNode->pEndCond);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode);
break;
......
......@@ -90,6 +90,7 @@ static SKeyword keywordTable[] = {
{"EXISTS", TK_EXISTS},
{"EXPIRED", TK_EXPIRED},
{"EXPLAIN", TK_EXPLAIN},
{"EVENT_WINDOW", TK_EVENT_WINDOW},
{"EVERY", TK_EVERY},
{"FILE", TK_FILE},
{"FILL", TK_FILL},
......@@ -195,15 +196,16 @@ static SKeyword keywordTable[] = {
{"SNODES", TK_SNODES},
{"SOFFSET", TK_SOFFSET},
{"SPLIT", TK_SPLIT},
{"STT_TRIGGER", TK_STT_TRIGGER},
{"STABLE", TK_STABLE},
{"STABLES", TK_STABLES},
{"START", TK_START},
{"STATE", TK_STATE},
{"STATE_WINDOW", TK_STATE_WINDOW},
{"STORAGE", TK_STORAGE},
{"STREAM", TK_STREAM},
{"STREAMS", TK_STREAMS},
{"STRICT", TK_STRICT},
{"STT_TRIGGER", TK_STT_TRIGGER},
{"SUBSCRIBE", TK_SUBSCRIBE},
{"SUBSCRIPTIONS", TK_SUBSCRIPTIONS},
{"SUBTABLE", TK_SUBTABLE},
......
......@@ -830,17 +830,11 @@ static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindo
pWindow->pStartCond = nodesCloneNode(pEvent->pStartCond);
pWindow->pEndCond = nodesCloneNode(pEvent->pEndCond);
pWindow->pTspk = nodesCloneNode(pEvent->pCol);
if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) {
if (NULL == pWindow->pStartCond || NULL == pWindow->pEndCond || NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
int32_t code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
return code;
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
......
......@@ -197,6 +197,15 @@ static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderL
return TSDB_CODE_SUCCESS;
}
static int32_t adjustEventDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
......@@ -205,6 +214,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder
return adjustSessionDataRequirement(pWindow, requirement);
case WINDOW_TYPE_STATE:
return adjustStateDataRequirement(pWindow, requirement);
case WINDOW_TYPE_EVENT:
return adjustEventDataRequirement(pWindow, requirement);
default:
break;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planTestUtil.h"
#include "planner.h"
using namespace std;
class PlanEventTest : public PlannerTestBase {};
TEST_F(PlanEventTest, basic) {
useDb("root", "test");
run("SELECT COUNT(*) FROM t1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'");
}
TEST_F(PlanEventTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 EVENT_WINDOW START WITH c1 > 10 END WITH c2 = 'abc'");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册