diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d04e965c6f5f3828611548aa235426a5d6f1235b..cdcf54bc61fb80dabcbef14da5cae403acf8cd9a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -586,7 +586,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 60729c4d0e5a292990eae57bccbdd3cc4c783ab9..a4a02b7d65b77a6b27539a9b9df513264c2ebcaf 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -275,7 +275,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) { - // - return 0; -} +// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) { +// // +// return 0; +// } diff --git a/tests/script/tsim/stream/distributeIntervalRetrive0.sim b/tests/script/tsim/stream/distributeIntervalRetrive0.sim index 529a2a1b309b1351bd0515c2597e327a58d8574f..a2d9c4ab458219beb81d53a4ad1148806e068e71 100644 --- a/tests/script/tsim/stream/distributeIntervalRetrive0.sim +++ b/tests/script/tsim/stream/distributeIntervalRetrive0.sim @@ -3,7 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/exec.sh -n dnode1 -s start -#==system sh/exec.sh -n dnode1 -s start -v sleep 50 sql connect @@ -11,7 +10,6 @@ sql connect sql create dnode $hostname2 port 7200 system sh/exec.sh -n dnode2 -s start -#==system sh/exec.sh -n dnode2 -s start -v print ===== step1 $x = 0 @@ -37,14 +35,14 @@ endi print ===== step2 -sql create database test vgroups 4; +sql create database test vgroups 10; sql use test; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table ts1 using st tags(1,1,1); sql create table ts2 using st tags(2,2,2); sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); -sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 into streamtST1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); +sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 delete_mark 10s into streamtST1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); sleep 1000 @@ -235,6 +233,29 @@ endi print loop3 over +sql insert into ts1 values(1648791200001,1,12,3,1.0); +sql insert into ts2 values(1648791200001,1,12,3,1.0); +sql insert into ts3 values(1648791200001,1,12,3,1.0); +sql insert into ts4 values(1648791200001,1,12,3,1.0); + +$loop_count = 0 +loop31: +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamtST1; + +if $rows <= 4 then + print =====rows=$rows + goto loop31 +endi + +print loop31 over + sql drop stream if exists streams1; sql drop database if exists test1; @@ -243,7 +264,7 @@ sql use test1; sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stream streams1 trigger at_once IGNORE EXPIRED 0 into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ; +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 delete_mark 20s into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ; sql insert into t1 values(1648791211000,1,2,3);