diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4e89beabcb34a1522b2b370c40d964774984ebef..cab8fea374aba1645f89d804724e6724db52d610 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1209,12 +1209,17 @@ typedef struct { int32_t code; } STaskDropRsp; +#define STREAM_TRIGGER_AT_ONCE 1 +#define STREAM_TRIGGER_WINDOW_CLOSE 2 + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char outputSTbName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; char* sql; char* ast; + int8_t triggerType; + int64_t watermark; } SCMCreateStreamReq; typedef struct { diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 6810b63f4e17267a3c8b153ff837f4d24fee2f1f..b62555d5492cb5e187aaea4fbc7889e31935ed81 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -272,14 +272,9 @@ typedef struct SKillStmt { int32_t targetId; } SKillStmt; -typedef enum EStreamTriggerType { - STREAM_TRIGGER_AT_ONCE = 1, - STREAM_TRIGGER_WINDOW_CLOSE -} EStreamTriggerType; - typedef struct SStreamOptions { ENodeType type; - EStreamTriggerType triggerType; + int8_t triggerType; SNode* pWatermark; } SStreamOptions; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3b5f9abe817c7432ef87ce589ac4d73c7ff7d8d3..9d4f554c2c43e441a8a2d7b2ace2fc5e05b7666f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -109,6 +109,8 @@ typedef struct SWindowLogicNode { int64_t sessionGap; SNode* pTspk; SNode* pStateExpr; + int8_t triggerType; + int64_t watermark; } SWindowLogicNode; typedef struct SSortLogicNode { @@ -251,6 +253,8 @@ typedef struct SWinodwPhysiNode { SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pFuncs; SNode* pTspk; // timestamp primary key + int8_t triggerType; + int64_t watermark; } SWinodwPhysiNode; typedef struct SIntervalPhysiNode { diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8db78fccf580807270fcb5cddea39dc2405bfb69..0b164bf43f57109fa91adb4547baaed91956452b 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -30,6 +30,8 @@ typedef struct SPlanContext { bool topicQuery; bool streamQuery; bool showRewrite; + int8_t triggerType; + int64_t watermark; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e31eea1b157cf73ac54c7dd42e2c7c803c26d83f..d51217874a28abb96715d3a3311e68630a4daf1f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -597,6 +597,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624) #define TSDB_CODE_PAR_INVALID_OPTION_UNIT TAOS_DEF_ERROR_CODE(0, 0x2625) #define TSDB_CODE_PAR_INVALID_KEEP_UNIT TAOS_DEF_ERROR_CODE(0, 0x2626) +#define TSDB_CODE_PAR_AGG_FUNC_NESTING TAOS_DEF_ERROR_CODE(0, 0x2627) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3e37c52c26207ee5134c1c41f2d71132dd1c47a2..3adb0102ce1fe674d373ad9a3ca7f240010b18e5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3381,6 +3381,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; + if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1; + if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; @@ -3404,6 +3406,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1; if (sqlLen > 0) { pReq->sql = taosMemoryCalloc(1, sqlLen + 1); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5ba931ccbf5c5986ec41d7cbf5e6e5e..15cd9fa0439a9b7b8a411adf028645caea27aecc 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1e68e8e9c5a27d8294f8e1f86fdd8f62687690f9..a474ccc5b6ba1defd31c4921ea4f343884a6f8d9 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -429,7 +429,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0ee8ff44cad00ff56d5927e98fe0d75bd391d9f..58e5b6c65b87ae037e00b32403581b5e5f57734a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -218,7 +218,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { +static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { if (NULL == ast) { return TSDB_CODE_SUCCESS; } @@ -232,6 +232,8 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, + .triggerType = triggerType, + .watermark = watermark, }; code = qCreateQueryPlan(&cxt, &pPlan, NULL); } @@ -245,7 +247,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -265,7 +267,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast #endif - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) { mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); return -1; } @@ -313,7 +315,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ce74748c0c76b49aaef5ec69c0a828b6d7cb5c8d..2c8a938a5a3538bd9751bdee36fa296e5985d4e5 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1104,6 +1104,8 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { static const char* jkWindowPhysiPlanExprs = "Exprs"; static const char* jkWindowPhysiPlanFuncs = "Funcs"; static const char* jkWindowPhysiPlanTsPk = "TsPk"; +static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; +static const char* jkWindowPhysiPlanWatermark = "Watermark"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; @@ -1118,6 +1120,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); + } return code; } @@ -1135,6 +1143,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); + } return code; } diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 91ac6ce2d26d431f0aeec688523db3049cb3e1eb..bd7c5d16b1e39706f688b4e4443af34f43719dc2 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -427,7 +427,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). /************************************************ create/drop stream **************************************************/ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) - stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, B, C, D); } + stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); } cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } into_opt(A) ::= . { A = NULL; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index dc1338be680d2062b00fe5838de3a1dacdf73772..324f9c17d2ace2a4dae2c3b81d9a5c9bbbfdcd32 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -481,6 +481,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { return DEAL_RES_CONTINUE; } +static EDealRes haveAggFunction(SNode* pNode, void* pContext) { + if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) { + *((bool*)pContext) = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); @@ -492,6 +500,11 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) if (fmIsAggFunc(pFunc->funcId) && beforeHaving(pCxt->currClause)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION); } + bool haveAggFunc = false; + nodesWalkExprs(pFunc->pParameterList, haveAggFunction, &haveAggFunc); + if (haveAggFunc) { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING); + } return DEAL_RES_CONTINUE; } @@ -2173,6 +2186,14 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* } } + if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) { + code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS == code) { + createReq.triggerType = pStmt->pOptions->triggerType; + createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); + } + if (TSDB_CODE_SUCCESS == code) { code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index efc807850f4c25fb27addd8b7b1f40e76467dc9c..549f448ff4ef9da2c4d0a325b8a363ef403f7512 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -91,6 +91,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Invalid option %s unit: %c, only m, h, d allowed"; case TSDB_CODE_PAR_INVALID_KEEP_UNIT: return "Invalid option keep unit: %c, %c, %c, only m, h, d allowed"; + case TSDB_CODE_PAR_AGG_FUNC_NESTING: + return "Aggregate functions do not support nesting"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 80aad1dc662323660d63d04f104b9fdec12ba34c..e9ec9c987c1be626f7114acc6d7ec33be549eaf7 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -3452,7 +3452,7 @@ static YYACTIONTYPE yy_reduce( { yymsp[-1].minor.yy100 = strtol(yymsp[0].minor.yy0.z, NULL, 10); } break; case 234: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options into_opt AS query_expression */ -{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-3].minor.yy452, yymsp[-2].minor.yy452, yymsp[0].minor.yy452); } +{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-2].minor.yy452, yymsp[-3].minor.yy452, yymsp[0].minor.yy452); } break; case 235: /* cmd ::= DROP STREAM exists_opt stream_name */ { pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy659, &yymsp[0].minor.yy479); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index fd0d2758bf9436700b1ac19933b592d3f9bf93e7..5427749d09d977f339cbdfd85cb156d44694b86e 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -463,6 +463,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { int32_t code = nodesCollectFuncs(pSelect, fmIsWindowClauseFunc, &pWindow->pFuncs); + if (pCxt->pPlanCxt->streamQuery) { + pWindow->triggerType = pCxt->pPlanCxt->triggerType; + pWindow->watermark = pCxt->pPlanCxt->watermark; + } + if (TSDB_CODE_SUCCESS == code) { code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a45fabd8288556af8303eef94e41e420eec682ba..f9f6d503a1457dc1c2ebe97046136a75054ff593 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -796,6 +796,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* } } + pWindow->triggerType = pWindowLogicNode->triggerType; + pWindow->watermark = pWindowLogicNode->watermark; + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pWindow; } else { diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 1171c9d4ac1d9c74bcb17a4eea029b76469a8f46..291b4c9cc70880d1328afbe2fe2197344dd2459c 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -123,6 +123,12 @@ private: tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); nodesStringToNode(req.ast, &pCxt->pAstRoot); pCxt->streamQuery = true; + } else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) { + SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; + pCxt->pAstRoot = pStmt->pQuery; + pCxt->streamQuery = true; + pCxt->triggerType = pStmt->pOptions->triggerType; + pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); } else { pCxt->pAstRoot = pQuery->pRoot; } @@ -353,11 +359,11 @@ TEST_F(PlannerTest, createTopic) { ASSERT_TRUE(run()); } -TEST_F(PlannerTest, stream) { +TEST_F(PlannerTest, createStream) { setDatabase("root", "test"); - bind("SELECT sum(c1) FROM st1"); - ASSERT_TRUE(run(true)); + bind("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 interval(10s)"); + ASSERT_TRUE(run()); } TEST_F(PlannerTest, createSmaIndex) {