diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 13ec8505fd6f30ba771f629b9b5e4a7375e68906..e89cd5e5375ba8319555654d765cf56b9de173c0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4411,6 +4411,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream taosArrayPush(pInfo->pChildren, &pChildOp); } } + + if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { + pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; + } + return pOperator; _error: diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 0d435a9fbd700c2cfb0f4696a3886ca20b281299..9fcdcfb9599db5bbd2d4e93a069647a2518abaac 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 +sql create database test vgroups 1; sql select * from information_schema.ins_databases if $rows != 3 then return -1 @@ -29,4 +29,100 @@ if $rows != 0 then return -1 endi + +sql create database test1 vgroups 4; +sql use test1; +sql create stable st(ts timestamp, a int, b int) tags(t int); +sql create table t1 using st tags(1); +sql create table t2 using st tags(2); + +sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s); +sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s); +sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s); +sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s); +sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s); +sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s); +sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s); +sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s); +sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b); +sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b); + +sql insert into t1 values(1648791213000,1,1); +sql insert into t1 values(1648791213001,2,1); +sql insert into t1 values(1648791213002,3,1); + +sql insert into t1 values(1648791233000,4,2); + +$loop_count = 0 + +loop1: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; + +if $rows != 1 then + print ======streamt2=$rows + return -1 +endi + +sql select * from streamt3; +if $rows != 2 then + print ======streamt3=$rows + goto loop1 +endi + +sql select * from streamt4; +if $rows != 1 then + print ======streamt4=$rows + return -1 +endi + +sql select * from streamt5; +if $rows != 2 then + print ======streamt5=$rows + goto loop1 +endi + +sql select * from streamt6; +if $rows != 1 then + print ======streamt6=$rows + return -1 +endi + +sql select * from streamt7; +if $rows != 2 then + print ======streamt7=$rows + goto loop1 +endi + +sql select * from streamt8; +if $rows != 1 then + print ======streamt8=$rows + return -1 +endi + +sql select * from streamt9; +if $rows != 2 then + print ======streamt9=$rows + goto loop1 +endi + +sql select * from streamt10; +if $rows != 1 then + print ======streamt10=$rows + return -1 +endi + +sql select * from streamt11; +if $rows != 2 then + print ======streamt11=$rows + goto loop1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT