diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8528985e05022e1f76c11050974ceb8a615ff763..6ce161560a36d2879250a35dd23ce3329319dcf5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1175,6 +1175,20 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 } } +static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, + SSessionKey* pKey) { + pKey->win.skey = startTs; + pKey->win.ekey = endTs; + pKey->groupId = groupId; + + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey); + int32_t code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + SET_SESSION_WIN_KEY_INVALID(pKey); + } + return code; +} + static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { blockDataCleanup(pDestBlock); if (pSrcBlock->info.rows == 0) { @@ -1210,7 +1224,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } SSessionKey endWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin); - ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin)); + if(IS_INVALID_SESSION_WIN_KEY(endWin)) { + getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin); + } + if (IS_INVALID_SESSION_WIN_KEY(startWin)) { + // window has been closed. + qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64 , startData[i], endData[i]); + continue; + } colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false); colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false); diff --git a/tests/script/tsim/stream/deleteSession.sim b/tests/script/tsim/stream/deleteSession.sim index 541609633b023611815252cde0109cdc01094198..c3c64a597774c490a1491816e0e2175be739828d 100644 --- a/tests/script/tsim/stream/deleteSession.sim +++ b/tests/script/tsim/stream/deleteSession.sim @@ -524,6 +524,112 @@ if $data42 != 14 then goto loop20 endi +sql drop database if exists test4; +sql drop stream if exists streams4; +sql create database test4 vgroups 1; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +print create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname session(ts, 2s); +sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname session(ts, 2s); + +sql insert into t1 values(1648791210000,1,2,3); +sql insert into t1 values(1648791220000,2,2,3); +sql insert into t1 values(1648791221000,2,2,3); +sql insert into t1 values(1648791222000,2,2,3); +sql insert into t1 values(1648791223000,2,2,3); +sql insert into t1 values(1648791231000,2,2,3); + +sql insert into t2 values(1648791210000,1,2,3); +sql insert into t2 values(1648791220000,2,2,3); +sql insert into t2 values(1648791221000,2,2,3); +sql insert into t2 values(1648791231000,2,2,3); + +$loop_count = 0 + +loop21: +sleep 200 +sql select * from streamt4 order by c1 desc;; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 6 then + print =====rows=$rows + goto loop21 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop21 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop21 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop21 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop21 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop21 +endi + +if $data51 != 1 then + print =====data51=$data51 + goto loop21 +endi + +print delete from st where ts >= 1648791220000 and ts <=1648791223000; +sql delete from st where ts >= 1648791220000 and ts <=1648791223000; + +loop22: +sleep 200 +sql select * from streamt4 order by c1 desc;; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop22 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop22 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop22 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop22 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop22 +endi + $loop_all = $loop_all + 1 print ============loop_all=$loop_all diff --git a/tests/script/tsim/stream/deleteState.sim b/tests/script/tsim/stream/deleteState.sim index ecd9f55340edbc79265255848f5240f0c02fd737..45d9bc3e39e926c459fb210345ba5713a6528867 100644 --- a/tests/script/tsim/stream/deleteState.sim +++ b/tests/script/tsim/stream/deleteState.sim @@ -189,6 +189,112 @@ if $data12 != 4 then goto loop6 endi +sql drop database if exists test4; +sql drop stream if exists streams4; +sql create database test4 vgroups 1; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +print create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname state_window(c); +sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from st partition by tbname state_window(c); + +sql insert into t1 values(1648791210000,1,2,1); +sql insert into t1 values(1648791220000,2,2,2); +sql insert into t1 values(1648791221000,2,2,2); +sql insert into t1 values(1648791222000,2,2,2); +sql insert into t1 values(1648791223000,2,2,2); +sql insert into t1 values(1648791231000,2,2,3); + +sql insert into t2 values(1648791210000,1,2,1); +sql insert into t2 values(1648791220000,2,2,2); +sql insert into t2 values(1648791221000,2,2,2); +sql insert into t2 values(1648791231000,2,2,3); + +$loop_count = 0 + +loop21: +sleep 200 +sql select * from streamt4 order by c1 desc;; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 6 then + print =====rows=$rows + goto loop21 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop21 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop21 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop21 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop21 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop21 +endi + +if $data51 != 1 then + print =====data51=$data51 + goto loop21 +endi + +print delete from st where ts >= 1648791220000 and ts <=1648791223000; +sql delete from st where ts >= 1648791220000 and ts <=1648791223000; + +loop22: +sleep 200 +sql select * from streamt4 order by c1 desc;; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop22 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop22 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop22 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop22 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop22 +endi + $loop_all = $loop_all + 1 print ============loop_all=$loop_all