未验证 提交 451b3628 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19965 from taosdata/fix/TD-22354

fix:number of ssdatablock rows exceeds the capacity
...@@ -832,10 +832,13 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group ...@@ -832,10 +832,13 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group
return true; return true;
} }
static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) {
if (pBlock->info.rows >= pBlock->info.capacity) {
return false;
}
uint64_t groupId = pBlock->info.id.groupId; uint64_t groupId = pBlock->info.id.groupId;
if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) { if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) {
return; return true;
} }
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
...@@ -853,6 +856,7 @@ static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill ...@@ -853,6 +856,7 @@ static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill
} }
} }
pBlock->info.rows++; pBlock->info.rows++;
return true;
} }
static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
...@@ -932,7 +936,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* ...@@ -932,7 +936,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
} }
if (pFillInfo->pos == FILL_POS_START) { if (pFillInfo->pos == FILL_POS_START) {
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
} }
if (pFillInfo->type != TSDB_FILL_LINEAR) { if (pFillInfo->type != TSDB_FILL_LINEAR) {
doStreamFillNormal(pFillSup, pFillInfo, pRes); doStreamFillNormal(pFillSup, pFillInfo, pRes);
...@@ -940,7 +946,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* ...@@ -940,7 +946,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
doStreamFillLinear(pFillSup, pFillInfo, pRes); doStreamFillLinear(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) { if (pFillInfo->pos == FILL_POS_MID) {
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
} }
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
...@@ -954,7 +962,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* ...@@ -954,7 +962,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
} }
} }
if (pFillInfo->pos == FILL_POS_END) { if (pFillInfo->pos == FILL_POS_END) {
buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
} }
} }
...@@ -989,10 +999,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -989,10 +999,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
uint64_t groupId = pBlock->info.id.groupId; uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
pRes->info.id.groupId = groupId; pRes->info.id.groupId = groupId;
if (hasRemainCalc(pFillInfo)) {
doStreamFillRange(pFillInfo, pFillSup, pRes);
}
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData; TSKEY* tsCol = (TSKEY*)pTsCol->pData;
...@@ -1204,13 +1210,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1204,13 +1210,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
if (pOperator->status == OP_RES_TO_RETURN) { if (hasRemainCalc(pInfo->pFillInfo) || (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true )) {
if (hasRemainCalc(pInfo->pFillInfo)) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); if (pInfo->pRes->info.rows > 0) {
if (pInfo->pRes->info.rows > 0) { printDataBlock(pInfo->pRes, "stream fill");
return pInfo->pRes; return pInfo->pRes;
}
} }
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator); doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
......
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));;
sql create stream streams1 trigger at_once into streamt as select _wstart ts, count(*) c1 from t1 interval(1s) fill(NULL);
sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa');
sleep 100
sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa');
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
sql select * from streamt where c1 > 0;
if $rows != 2 then
print =====rows=$rows
goto loop0
endi
sql select count(*) from streamt;
if $data00 != 4098 then
print =====data00=$data00
goto loop0
endi
sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa');
$loop_count = 0
loop1:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
sql select * from streamt where c1 > 0;
if $rows != 3 then
print =====rows=$rows
goto loop1
endi
sql select count(*) from streamt;
if $data00 != 9098 then
print =====rows=$rows
goto loop1
endi
sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa');
$loop_count = 0
loop2:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
sql select * from streamt where c1 > 0;
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
sql select count(*) from streamt;
if $data00 != 14098 then
print =====rows=$rows
goto loop2
endi
sql insert into t1 values(1648801308000,1,1,1,1.0,'aaa') (1648802308000,1,1,1,1.0,'aaa') (1648803308000,1,1,1,1.0,'aaa') (1648804308000,1,1,1,1.0,'aaa') (1648805308000,1,1,1,1.0,'aaa');
$loop_count = 0
loop21:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
sql select * from streamt where c1 > 0;
if $rows != 9 then
print =====rows=$rows
goto loop21
endi
sql select count(*) from streamt;
if $data00 != 19098 then
print =====rows=$rows
goto loop21
endi
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));
print create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear);
sql create stream streams1 trigger at_once into streamt as select _wstart ts, max(a) c1 from t1 interval(1s) fill(linear);
print create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev);
sql create stream streams2 trigger at_once into streamt2 as select _wstart ts, max(a) c1 from t1 interval(1s) fill(prev);
sql insert into t1 values(1648791211000,1,2,3,1.0,'aaa');
sleep 100
sql insert into t1 values(1648795308000,1,2,3,1.0,'aaa');
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
print select count(*) from streamt;
sql select count(*) from streamt;
if $data00 != 4098 then
print =====data00=$data00
goto loop3
endi
print select count(*) from streamt2;
sql select count(*) from streamt2;
if $data00 != 4098 then
print =====data00=$data00
goto loop3
endi
sql insert into t1 values(1648800308000,1,1,1,1.0,'aaa');
$loop_count = 0
loop4:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
print select count(*) from streamt;
sql select count(*) from streamt;
if $data00 != 9098 then
print =====rows=$rows
goto loop4
endi
print select count(*) from streamt2;
sql select count(*) from streamt2;
if $data00 != 9098 then
print =====rows=$rows
goto loop4
endi
sql insert into t1 values(1648786211000,1,1,1,1.0,'aaa');
$loop_count = 0
loop5:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 500
print select count(*) from streamt;
sql select count(*) from streamt;
if $data00 != 14098 then
print =====rows=$rows
goto loop5
endi
print select count(*) from streamt2;
sql select count(*) from streamt2;
if $data00 != 14098 then
print =====rows=$rows
goto loop5
endi
system sh/stop_dnodes.sh
#goto looptest
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册