diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 41e4c990f851a3cbcaf3a5982e16dc024caf0eab..16983cb5072496531d7eb7f03eac9c88bd6d8423 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -832,10 +832,13 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group return true; } -static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { +static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { + if (pBlock->info.rows >= pBlock->info.capacity) { + return false; + } uint64_t groupId = pBlock->info.id.groupId; if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) { - return; + return true; } for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; @@ -853,6 +856,7 @@ static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill } } pBlock->info.rows++; + return true; } static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { @@ -932,7 +936,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* } if (pFillInfo->pos == FILL_POS_START) { - buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); + if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } } if (pFillInfo->type != TSDB_FILL_LINEAR) { doStreamFillNormal(pFillSup, pFillInfo, pRes); @@ -940,7 +946,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* doStreamFillLinear(pFillSup, pFillInfo, pRes); if (pFillInfo->pos == FILL_POS_MID) { - buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); + if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } } if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { @@ -954,7 +962,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* } } if (pFillInfo->pos == FILL_POS_END) { - buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); + if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } } } @@ -989,10 +999,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { uint64_t groupId = pBlock->info.id.groupId; SSDataBlock* pRes = pInfo->pRes; pRes->info.id.groupId = groupId; - if (hasRemainCalc(pFillInfo)) { - doStreamFillRange(pFillInfo, pFillSup, pRes); - } - SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); TSKEY* tsCol = (TSKEY*)pTsCol->pData; @@ -1204,13 +1210,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { return NULL; } blockDataCleanup(pInfo->pRes); - if (pOperator->status == OP_RES_TO_RETURN) { - if (hasRemainCalc(pInfo->pFillInfo)) { - doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); - if (pInfo->pRes->info.rows > 0) { - return pInfo->pRes; - } + if (hasRemainCalc(pInfo->pFillInfo) || (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true )) { + doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); + if (pInfo->pRes->info.rows > 0) { + printDataBlock(pInfo->pRes, "stream fill"); + return pInfo->pRes; } + } + if (pOperator->status == OP_RES_TO_RETURN) { doDeleteFillFinalize(pOperator); if (pInfo->pRes->info.rows > 0) { printDataBlock(pInfo->pRes, "stream fill"); diff --git a/tests/script/tsim/stream/fillIntervalRange.sim b/tests/script/tsim/stream/fillIntervalRange.sim new file mode 100644 index 0000000000000000000000000000000000000000..a0905141f2bed1230abd3f51acc867aed19d1c45 --- /dev/null +++ b/tests/script/tsim/stream/fillIntervalRange.sim @@ -0,0 +1,225 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start + +sleep 500 +sql connect + +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));; +sql create stream streams1 trigger at_once into streamt as select _wstart ts, count(*) c1 from t1 interval(1s) fill(NULL); +sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa'); +sleep 100 +sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa'); + +$loop_count = 0 + +loop0: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 +sql select * from streamt where c1 > 0; + +if $rows != 2 then + print =====rows=$rows + goto loop0 +endi + +sql select count(*) from streamt; + +if $data00 != 4098 then + print =====data00=$data00 + goto loop0 +endi + +sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa'); + + +$loop_count = 0 + +loop1: +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 +sql select * from streamt where c1 > 0; + +if $rows != 3 then + print =====rows=$rows + goto loop1 +endi + +sql select count(*) from streamt; + +if $data00 != 9098 then + print =====rows=$rows + goto loop1 +endi + +sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa'); + + +$loop_count = 0 + +loop2: +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 +sql select * from streamt where c1 > 0; + +if $rows != 4 then + print =====rows=$rows + goto loop2 +endi + +sql select count(*) from streamt; + +if $data00 != 14098 then + print =====rows=$rows + goto loop2 +endi + +sql insert into t1 values(1648801308000,1,1,1,1.0,'aaa') (1648802308000,1,1,1,1.0,'aaa') (1648803308000,1,1,1,1.0,'aaa') (1648804308000,1,1,1,1.0,'aaa') (1648805308000,1,1,1,1.0,'aaa'); + + +$loop_count = 0 + +loop21: +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 +sql select * from streamt where c1 > 0; + +if $rows != 9 then + print =====rows=$rows + goto loop21 +endi + +sql select count(*) from streamt; + +if $data00 != 19098 then + print =====rows=$rows + goto loop21 +endi + +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20)); +print create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear); +sql create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear); + +print create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev); +sql create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev); + +sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa'); +sleep 100 +sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa'); + +$loop_count = 0 + +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 + +print select count(*) from streamt; +sql select count(*) from streamt; + +if $data00 != 4098 then + print =====data00=$data00 + goto loop3 +endi + +print select count(*) from streamt2; +sql select count(*) from streamt2; + +if $data00 != 4098 then + print =====data00=$data00 + goto loop3 +endi + +sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa'); + + +$loop_count = 0 + +loop4: +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 + +print select count(*) from streamt; +sql select count(*) from streamt; + +if $data00 != 9098 then + print =====rows=$rows + goto loop4 +endi + +print select count(*) from streamt2; +sql select count(*) from streamt2; + +if $data00 != 9098 then + print =====rows=$rows + goto loop4 +endi + +sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa'); + + +$loop_count = 0 + +loop5: +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 500 + +print select count(*) from streamt; +sql select count(*) from streamt; + +if $data00 != 14098 then + print =====rows=$rows + goto loop5 +endi + +print select count(*) from streamt2; +sql select count(*) from streamt2; + +if $data00 != 14098 then + print =====rows=$rows + goto loop5 +endi + +system sh/stop_dnodes.sh + +#goto looptest