diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1c44af49b7196a74edf4dfe1bf7f693d346ff27c..ea429a76b0357d85b295fb1fbcc24c685290464f 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -650,10 +650,10 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { res = 0; resKey = tmpKey; + streamStateCurPrev(pState, pCur); } else { break; } - streamStateCurPrev(pState, pCur); } *curKey = resKey; streamStateFreeCur(pCur); @@ -700,9 +700,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch memcpy(tmp, *pVal, valSize); goto _end; } + + streamStateCurNext(pState, pCur); + } else { + *key = tmpKey; + streamStateFreeCur(pCur); + pCur = streamStateSessionSeekKeyNext(pState, key); } - streamStateCurNext(pState, pCur); code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 60a50f95607129efcdcb406c01ca1eadf196c869..dc7d9bc40746b76ad3b34430981c86c2b75c9010 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -449,12 +449,12 @@ if $data26 != 14 then return -1 endi -sql create database test1 vgroups 1 -sql select * from information_schema.ins_databases +sql create database test1 vgroups 1; +sql select * from information_schema.ins_databases; print $data00 $data01 $data02 -sql use test1 +sql use test1; sql create table t1(ts timestamp, a int, b int , c int, d double, id int); sql create stream streams2 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a); @@ -498,7 +498,50 @@ if $data15 != 3 then goto loop5 endi -sql drop database test; -sql drop database test1; +sql create database test3 vgroups 1; +sql use test3; + +sql create table t1(ts timestamp, a int, b int , c int, d double, id int); +sql create stream streams3 trigger at_once into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a); +sql insert into t1 values(1648791212000,1,2,3,1.0,1); +sql insert into t1 values(1648791213000,2,2,3,1.0,1); +sql insert into t1 values(1648791214000,3,2,4,1.0,2); +sql insert into t1 values(1648791215000,4,2,3,1.0,1); +sql insert into t1 values(1648791211000,5,2,3,1.0,1); +sql insert into t1 values(1648791210000,6,2,4,1.0,2); +sql insert into t1 values(1648791217000,7,2,3,1.0,1); +sql insert into t1 values(1648791219000,8,2,3,1.0,1); +sql insert into t1 values(1648791209000,9,2,4,1.0,2); +sql insert into t1 values(1648791220000,10,2,4,1.0,2); + +sql insert into t1 values(1648791212000,1,2,3,1.0,1); +sql insert into t1 values(1648791213000,2,2,3,1.0,1); +sql insert into t1 values(1648791214000,3,2,4,1.0,2); +sql insert into t1 values(1648791215000,4,2,3,1.0,1); +sql insert into t1 values(1648791211000,5,2,3,1.0,1); +sql insert into t1 values(1648791210000,6,2,4,1.0,2); +sql insert into t1 values(1648791217000,7,2,3,1.0,1); +sql insert into t1 values(1648791219000,8,2,3,1.0,1); +sql insert into t1 values(1648791209000,9,2,4,1.0,2); +sql insert into t1 values(1648791220000,10,2,4,1.0,2); + + +$loop_count = 0 +loop6: + +sleep 300 + +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 10 then + print =====rows=$rows + goto loop6 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT