diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2af551b832e9c28badfa455bf4f7f8cb13f3d288..813da3c436a463902d6d05eb211b76d8f3f9117f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2347,6 +2347,17 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } +static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, + TSKEY* primaryKeys, int32_t prevPosition) { + int32_t startPos = prevPosition + 1; + if (startPos == pDataBlockInfo->rows) { + startPos = -1; + } else { + *pNext = getFinalTimeWindow(primaryKeys[startPos], pInterval); + } + return startPos; +} + static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, SHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; @@ -2457,8 +2468,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } int32_t prevEndPos = (forwardRows - 1) * step + startPos; ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); - startPos = - getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + if (IS_FINAL_OP(pInfo)) { + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); + } else { + startPos = + getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + } if (startPos < 0) { break; } diff --git a/tests/script/tsim/stream/basic3.sim b/tests/script/tsim/stream/basic3.sim index 48fb860a72272077051f16d0df7723a46706e0b3..41e19b19af0e388ff8a736723d9d74df356e2a6a 100644 --- a/tests/script/tsim/stream/basic3.sim +++ b/tests/script/tsim/stream/basic3.sim @@ -1,7 +1,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c debugflag -v 131 -system sh/exec.sh -n dnode1 -s start -v +system sh/cfg.sh -n dnode1 -c debugflag 131 +system sh/exec.sh -n dnode1 -s start sleep 5000 diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index c9a1ddd922b4c6d1194b87902ef6bdf22826165e..8287274cd2a008f264365ea06d049428ae645d2f 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -672,6 +672,123 @@ if $data61 != 1 then goto loop5 endi +print step 8 + +sql drop stream IF EXISTS streams4; +sql drop database IF EXISTS test4; + +sql create database test4 vgroups 6; +sql use test4; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams4 trigger at_once into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s); + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791243000,2,1,1,1.0); + +sql insert into t2 values(1648791273000,3,1,1,1.0); +sql insert into t2 values(1648791313000,4,1,1,1.0); + +$loop_count = 0 + +loop6: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt4 order by 1; + +# row 0 +if $rows != 8 then + print ====loop6=rows=$rows + goto loop6 +endi + +if $data01 != 1 then + print ====loop6=data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print ====loop6=data02=$data02 + return -1 +endi + +if $data11 != 1 then + print ====loop6=data11=$data11 + goto loop6 +endi + +if $data12 != 1 then + print ====loop6=data12=$data12 + return -1 +endi + +if $data21 != 1 then + print ====loop6=data21=$data21 + goto loop6 +endi + +if $data22 != 2 then + print ====loop6=data22=$data22 + return -1 +endi + +if $data31 != 1 then + print ====loop6=data31=$data31 + goto loop6 +endi + +if $data32 != 2 then + print ====loop6=data32=$data32 + return -1 +endi + +if $data41 != 1 then + print ====loop6=data41=$data41 + goto loop6 +endi + +if $data42 != 3 then + print ====loop6=data42=$data42 + return -1 +endi + +if $data51 != 1 then + print ====loop6=data51=$data51 + goto loop6 +endi + +if $data52 != 3 then + print ====loop6=data52=$data52 + return -1 +endi + +if $data61 != 1 then + print ====loop6=data61=$data61 + return -1 +endi + +if $data62 != 4 then + print ====loop6=data62=$data62 + return -1 +endi + +if $data71 != 1 then + print ====loop6=data71=$data71 + return -1 +endi + +if $data72 != 4 then + print ====loop6=data72=$data72 + return -1 +endi + $loop_all = $loop_all + 1 print ============loop_all=$loop_all