From 212144a3e6312e74689f66314a98897706fda087 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 20 Jun 2022 21:50:13 +0800 Subject: [PATCH] test(stream): window close not work for stb --- examples/c/stream_demo.c | 10 ++++--- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 3 ++ source/dnode/mnode/impl/src/mndSnode.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 20 ++++++------- tests/script/tsim/stream/windowClose.sim | 32 +++++++++++++++++++++ 6 files changed, 51 insertions(+), 16 deletions(-) create mode 100644 tests/script/tsim/stream/windowClose.sim diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 6d341c61c7..5f6e3b2aeb 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -25,19 +25,21 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); +#if 0 pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); +#endif pRes = taos_query(pConn, "use abc1"); if (taos_errno(pRes) != 0) { @@ -88,9 +90,9 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = taos_query( - pConn, - "create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) "); + pRes = taos_query(pConn, + "create stream stream1 trigger window_close into outstb as select _wstartts, sum(k) from st1 " + "interval(10s) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a845ae7b39..0112feedd2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -183,6 +183,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 645634d7f3..2b6258b10a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -496,6 +496,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // input pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + // trigger + pTask->triggerParam = pStream->triggerParam; + // sink or dispatch if (hasExtraSink) { mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 0a99f356b1..12188a3b3a 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -408,6 +408,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { goto _OVER; } + // check deletable code = mndDropSnode(pMnode, pReq, pObj); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d432256f15..fb92efecf6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -518,7 +518,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq // TODO streamObj.fixedSinkVgId = 0; streamObj.smaId = 0; - /*streamObj.physicalPlan = "";*/ streamObj.trigger = pCreate->triggerType; streamObj.watermark = pCreate->watermark; streamObj.triggerParam = pCreate->maxDelay; @@ -607,17 +606,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } #endif - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq); - if (pTrans == NULL) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - goto _OVER; - } - - mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL); - // TODO - /*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/ - mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); - // build stream obj from request SStreamObj streamObj = {0}; if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { @@ -626,6 +614,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq); + if (pTrans == NULL) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto _OVER; + } + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); + mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); + // create stb for stream if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim new file mode 100644 index 0000000000..07d7fb794e --- /dev/null +++ b/tests/script/tsim/stream/windowClose.sim @@ -0,0 +1,32 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test +sql create stable st(ts timestamp, a int) tags(t int); +sql create table tu1 using st tags(1); +sql create table tu2 using st tags(2); + +sql create stream stream1 trigger window_close into streamt as select _wstartts, sum(a) from st interval(10s); + +sql insert into tu1 values(now, 1); + +sleep 300 +sql select * from streamt; +if $rows != 0 then + print ======$rows + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT -- GitLab