提交 bc36bc18 编写于 作者: X Xiaoyu Wang

feat: event window

上级 dbb90c21
......@@ -112,6 +112,7 @@ typedef enum ENodeType {
QUERY_NODE_COLUMN_REF,
QUERY_NODE_WHEN_THEN,
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
......@@ -265,7 +266,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT
} ENodeType;
/**
......
......@@ -185,7 +185,12 @@ typedef struct SMergeLogicNode {
bool groupSort;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE,
WINDOW_TYPE_EVENT
} EWindowType;
typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1,
......@@ -212,6 +217,8 @@ typedef struct SWindowLogicNode {
SNode* pTspk;
SNode* pTsEnd;
SNode* pStateExpr;
SNode* pStartCond;
SNode* pEndCond;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
......@@ -498,6 +505,12 @@ typedef struct SStateWinodwPhysiNode {
typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;
typedef struct SEventWinodwPhysiNode {
SWinodwPhysiNode window;
SNode* pStartCond;
SNode* pEndCond;
} SEventWinodwPhysiNode;
typedef struct SSortPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
......
......@@ -223,6 +223,13 @@ typedef struct SIntervalWindowNode {
SNode* pFill;
} SIntervalWindowNode;
typedef struct SEventWindowNode {
ENodeType type; // QUERY_NODE_EVENT_WINDOW
SNode* pCol; // timestamp primary key
SNode* pStartCond;
SNode* pEndCond;
} SEventWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
......
......@@ -295,6 +295,13 @@ static int32_t stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNod
return TSDB_CODE_SUCCESS;
}
static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
return TSDB_CODE_SUCCESS;
}
static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
......@@ -709,6 +716,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_STATE_WINDOW:
code = stateWindowNodeCopy((const SStateWindowNode*)pNode, (SStateWindowNode*)pDst);
break;
case QUERY_NODE_EVENT_WINDOW:
code = eventWindowNodeCopy((const SEventWindowNode*)pNode, (SEventWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
......
......@@ -85,6 +85,8 @@ const char* nodesNodeName(ENodeType type) {
return "WhenThen";
case QUERY_NODE_CASE_WHEN:
return "CaseWhen";
case QUERY_NODE_EVENT_WINDOW:
return "EventWindow";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
......@@ -3660,6 +3662,36 @@ static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkEventWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkEventWindowStartCond = "StartCond";
static const char* jkEventWindowEndCond = "EndCond";
static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWindowNode* pNode = (const SEventWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkEventWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowStartCond, nodeToJson, pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowEndCond, nodeToJson, pNode->pEndCond);
}
return code;
}
static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) {
SEventWindowNode* pNode = (SEventWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkEventWindowTsPrimaryKey, &pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowStartCond, &pNode->pStartCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowEndCond, &pNode->pEndCond);
}
return code;
}
static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
......@@ -4615,6 +4647,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return whenThenNodeToJson(pObj, pJson);
case QUERY_NODE_CASE_WHEN:
return caseWhenNodeToJson(pObj, pJson);
case QUERY_NODE_EVENT_WINDOW:
return eventWindowNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
......@@ -4787,6 +4821,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToWhenThenNode(pJson, pObj);
case QUERY_NODE_CASE_WHEN:
return jsonToCaseWhenNode(pJson, pObj);
case QUERY_NODE_EVENT_WINDOW:
return jsonToEventWindowNode(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
......
......@@ -165,6 +165,17 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
}
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
res = walkExpr(pEvent->pCol, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pStartCond, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pEndCond, order, walker, pContext);
}
break;
}
default:
break;
}
......@@ -329,6 +340,17 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
}
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
res = rewriteExpr(&pEvent->pCol, order, rewriter, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pStartCond, order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pEndCond, order, rewriter, pContext);
}
break;
}
default:
break;
}
......
......@@ -299,6 +299,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SWhenThenNode));
case QUERY_NODE_CASE_WHEN:
return makeNode(type, sizeof(SCaseWhenNode));
case QUERY_NODE_EVENT_WINDOW:
return makeNode(type, sizeof(SEventWindowNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
......@@ -765,16 +767,23 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_COLUMN_REF: // no pointer field
break;
case QUERY_NODE_WHEN_THEN: {
SWhenThenNode* pStmt = (SWhenThenNode*)pNode;
nodesDestroyNode(pStmt->pWhen);
nodesDestroyNode(pStmt->pThen);
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
nodesDestroyNode(pWhenThen->pWhen);
nodesDestroyNode(pWhenThen->pThen);
break;
}
case QUERY_NODE_CASE_WHEN: {
SCaseWhenNode* pStmt = (SCaseWhenNode*)pNode;
nodesDestroyNode(pStmt->pCase);
nodesDestroyNode(pStmt->pElse);
nodesDestroyList(pStmt->pWhenThenList);
SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode;
nodesDestroyNode(pCaseWhen->pCase);
nodesDestroyNode(pCaseWhen->pElse);
nodesDestroyList(pCaseWhen->pWhenThenList);
break;
}
case QUERY_NODE_EVENT_WINDOW: {
SEventWindowNode* pEvent = (SEventWindowNode*)pNode;
nodesDestroyNode(pEvent->pCol);
nodesDestroyNode(pEvent->pStartCond);
nodesDestroyNode(pEvent->pEndCond);
break;
}
case QUERY_NODE_SET_OPERATOR: {
......
......@@ -116,6 +116,7 @@ SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const STok
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill);
SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValues);
......
......@@ -964,6 +964,8 @@ twindow_clause_opt(A) ::=
twindow_clause_opt(A) ::=
INTERVAL NK_LP duration_literal(B) NK_COMMA duration_literal(C) NK_RP
sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), D, E); }
twindow_clause_opt(A) ::=
EVENT_WINDOW START WITH search_condition(B) END WITH search_condition(C). { A = createEventWindowNode(pCxt, B, C); }
sliding_opt(A) ::= . { A = NULL; }
sliding_opt(A) ::= SLIDING NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
......
......@@ -605,6 +605,20 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
return (SNode*)state;
}
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond) {
CHECK_PARSER_STATUS(pCxt);
SEventWindowNode* pEvent = (SEventWindowNode*)nodesMakeNode(QUERY_NODE_EVENT_WINDOW);
CHECK_OUT_OF_MEM(pEvent);
pEvent->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == pEvent->pCol) {
nodesDestroyNode((SNode*)pEvent);
CHECK_OUT_OF_MEM(NULL);
}
pEvent->pStartCond = pStartCond;
pEvent->pEndCond = pEndCond;
return (SNode*)pEvent;
}
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill) {
CHECK_PARSER_STATUS(pCxt);
......
......@@ -3143,6 +3143,15 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSelectStmt* pSel
return TSDB_CODE_SUCCESS;
}
static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"EVENT_WINDOW requires valid time series input");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
......@@ -3151,6 +3160,8 @@ static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSe
return translateSessionWindow(pCxt, pSelect);
case QUERY_NODE_INTERVAL_WINDOW:
return translateIntervalWindow(pCxt, pSelect);
case QUERY_NODE_EVENT_WINDOW:
return translateEventWindow(pCxt, pSelect);
default:
break;
}
......
此差异已折叠。
......@@ -814,6 +814,35 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindowNode* pEvent, SSelectStmt* pSelect,
SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
if (NULL == pWindow) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWindow->winType = WINDOW_TYPE_EVENT;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect);
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder;
pWindow->pStartCond = nodesCloneNode(pEvent->pStartCond);
pWindow->pEndCond = nodesCloneNode(pEvent->pEndCond);
pWindow->pTspk = nodesCloneNode(pEvent->pCol);
if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
int32_t code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
return code;
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
......@@ -826,6 +855,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return createWindowLogicNodeBySession(pCxt, (SSessionWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_EVENT_WINDOW:
return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
default:
break;
}
......
......@@ -1297,6 +1297,33 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return code;
}
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT : QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT));
if (NULL == pEvent) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pEvent;
} else {
nodesDestroyNode((SNode*)pEvent);
}
return code;
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
SPhysiNode** pPhyNode) {
switch (pWindowLogicNode->winType) {
......@@ -1306,6 +1333,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_STATE:
return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_EVENT:
return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
default:
break;
}
......
......@@ -729,6 +729,18 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
}
}
static int32_t stbSplSplitEventForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitEventForStream(pCxt, pInfo);
} else {
return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
}
}
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
......@@ -741,6 +753,8 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
return stbSplSplitSession(pCxt, pInfo);
case WINDOW_TYPE_STATE:
return stbSplSplitState(pCxt, pInfo);
case WINDOW_TYPE_EVENT:
return stbSplSplitEvent(pCxt, pInfo);
default:
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册