diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84317c825b0e1adbc8aa62103ef3e3780c083848..1e5c6c2168646e877fbfe33a209a661d8b4f07da 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1829,6 +1829,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pUpdateRes, "recover update"); return pInfo->pUpdateRes; } break; + case STREAM_SCAN_FROM_DELETE_DATA: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + printDataBlock(pInfo->pDeleteDataRes, "recover delete"); + return pInfo->pDeleteDataRes; + } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { @@ -2021,6 +2030,7 @@ FETCH_NEXT_BLOCK: copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock); blockDataCleanup(pSup->pScanBlock); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; return pInfo->pUpdateRes; } diff --git a/tests/script/tsim/stream/state1.sim b/tests/script/tsim/stream/state1.sim index 2ae5739642b2aed7e2114f2176e4e00c429bb787..67e02c0890821dc2a57776e64faa95ee9a89c997 100644 --- a/tests/script/tsim/stream/state1.sim +++ b/tests/script/tsim/stream/state1.sim @@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +print step 1 print =============== create database sql create database test vgroups 4; sql select * from information_schema.ins_databases; @@ -33,8 +34,8 @@ if $loop_count == 10 then endi sql select * from streamt1; -print data00 data01 -print data10 data11 +print $data00 $data01 +print $data10 $data11 if $rows != 0 then print =====rows=$rows @@ -52,8 +53,8 @@ if $loop_count == 10 then endi sql select * from streamt1; -print data00 data01 -print data10 data11 +print $data00 $data01 +print $data10 $data11 if $rows != 1 then print =====rows=$rows @@ -92,9 +93,64 @@ endi sql select * from streamt1; if $rows != 2 then print =====rows=$rows - goto loop2 + goto loop3 +endi + +print step 1 over +print step 2 + +sql create database test2 vgroups 1; +sql use test2; +sql create table t1(ts timestamp, a int, b int , c int, d double); +print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a) +sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213010,1,2,3,1.1); + +$loop_count = 0 +loop4: + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; +print $data00 $data01 +print $data10 $data11 + +if $rows != 1 then + print =====rows=$rows + goto loop4 +endi + +print insert into t1 values(1648791213005,2,2,3,1.1) +sql insert into t1 values(1648791213005,2,2,3,1.1); + +$loop_count = 0 +loop5: + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print select * from streamt2 +sql select * from streamt2; +print $data00 $data01 +print $data10 $data11 +print $data20 $data21 +print $data30 $data31 + +if $rows != 3 then + print =====rows=$rows + goto loop5 endi +print step 2 over print state1 end