提交 c4414ec6 编写于 作者: 5 54liuyao

feat(stream):exists stable bug&ci

上级 637195ce
......@@ -488,9 +488,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
tagVal.type = TSDB_DATA_TYPE_NULL;
tagVal.pData = NULL;
tagVal.nData = 0;
continue;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
tagVal.nData = varDataLen(pData);
tagVal.pData = varDataVal(pData);
......
......@@ -971,7 +971,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
void* pValue = NULL;
if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) {
if (streamStateGetParName(pState, groupId, &pValue) != 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
......
......@@ -1786,7 +1786,7 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++;
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
if (pBlock->info.parTbName[0]) {
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
}
// TODO move into scan
......
......@@ -1614,7 +1614,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
}
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo);
pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
cleanupExprSupp(&pInfo->scalarSupp);
taosMemoryFreeClear(param);
......
......@@ -19,9 +19,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb varchar(100),tc int);
sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
sql create stream streams0 trigger at_once into result.streamt0 tags(tb) as select _wstart, count(*) c1, max(a) c2 from st partition by tbname tb interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
......@@ -61,6 +61,16 @@ if $data02 != 1 then
goto loop0
endi
if $data03 != NULL then
print =====data03=$data03
goto loop0
endi
if $data04 != t1 then
print =====data04=$data04
goto loop0
endi
if $data11 != 1 then
print =====data11=$data11
goto loop0
......@@ -71,6 +81,16 @@ if $data12 != 2 then
goto loop0
endi
if $data13 != NULL then
print =====data13=$data13
goto loop0
endi
if $data14 != t2 then
print =====data14=$data14
goto loop0
endi
print ===== step3
sql create database result1 vgroups 1;
......@@ -83,9 +103,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta varchar(100),tb int,tc int);
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) tags(ta) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname as ta interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
......@@ -161,7 +181,7 @@ sql create table t2 using st tags(2,2,2);
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
# tag dest 1, source 2
##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
# column dest 3, source 4
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
......@@ -173,7 +193,7 @@ sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
# column dest 3, source 2
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) tags(ta) as select _wstart, count(*) c1 from st partition by tbname as ta interval(10s);
print ===== step5
......@@ -252,16 +272,16 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,2,3);
sql create table t2 using st tags(4,5,6);
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int);
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg1 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
sql insert into t1 values(1648791213000,10,20,30);
sql insert into t2 values(1648791213000,40,50,60);
$loop_count = 0
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
......@@ -275,7 +295,7 @@ if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4;
sql select * from result4.streamt4 order by tg1;
if $rows != 2 then
print =====rows=$rows
......@@ -285,7 +305,7 @@ if $rows != 2 then
goto loop2
endi
if $data01 != 40 then
if $data01 != 10 then
print =====data01=$data01
goto loop2
endi
......@@ -295,7 +315,7 @@ if $data02 != 20 then
goto loop2
endi
if $data03 != 2 then
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
......@@ -305,6 +325,26 @@ if $data04 != NULL then
goto loop2
endi
if $data11 != 40 then
print =====data11=$data11
goto loop2
endi
if $data12 != 50 then
print =====data12=$data12
goto loop2
endi
if $data13 != 1 then
print =====data13=$data13
goto loop2
endi
if $data14 != NULL then
print =====data14=$data14
goto loop2
endi
print ======over
system sh/stop_dnodes.sh
......@@ -367,6 +367,77 @@ if $data22 != tag-t3 then
goto loop8
endi
print ===== step6
print ===== transform tag value
sql drop stream if exists streams1;
sql drop stream if exists streams2;
sql drop stream if exists streams3;
sql drop stream if exists streams4;
sql drop stream if exists streams5;
sql drop database if exists test1;
sql drop database if exists test2;
sql drop database if exists test3;
sql drop database if exists test4;
sql drop database if exists test5;
sql drop database if exists result1;
sql drop database if exists result2;
sql drop database if exists result3;
sql drop database if exists result4;
sql drop database if exists result5;
sql create database result6 vgroups 1;
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta varchar(20), tb int, tc int);
sql create table t1 using st tags("1",1,1);
sql create table t2 using st tags("2",2,2);
sql create table t3 using st tags("3",3,3);
sql create stream streams6 trigger at_once into result6.streamt6 TAGS(dd int) as select _wstart, count(*) c1 from st partition by concat(ta, "0") as dd, tbname interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop9:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result6.streamt6 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop9
endi
if $data02 != 10 then
print =====data02=$data02
goto loop9
endi
if $data12 != 20 then
print =====data12=$data12
goto loop9
endi
if $data22 != 30 then
print =====data22=$data22
goto loop8
endi
print ======over
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
print ===== table name
sql create database result vgroups 1;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
print ===== insert into 1
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
if $rows != 1 then
print =====rows=$rows
print $data00
print $data10
goto loop0
endi
if $data00 != aaa then
print =====data00=$data00
goto loop0
endi
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result.streamt;
if $rows != 1 then
print =====rows=$rows
print $data00 $data10
goto loop1
endi
print ===== step3
print ===== column name
sql create database result2 vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st interval(10s);
print ===== insert into 2
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
if $rows != 1 then
print =====rows=$rows
print $data00
print $data10
goto loop2
endi
if $data00 != cc then
print =====data00=$data00
goto loop2
endi
print sql select cc from result2.streamt2 order by 1;
$loop_count = 0
loop21:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select cc from result2.streamt2 order by 1;
if $rows != 1 then
print =====rows=$rows
print $data00
print $data10
goto loop21
endi
if $data00 != NULL then
print =====data00=$data00
goto loop21
endi
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result2.streamt2;
if $rows != 1 then
print =====rows=$rows
print $data00 $data10
goto loop3
endi
print ===== step4
print ===== column name + table name
sql create database result3 vgroups 1;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s);
print ===== insert into 3
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
if $rows != 1 then
print =====rows=$rows
print $data00
print $data10
goto loop4
endi
if $data00 != NULL then
print =====data00=$data00
goto loop4
endi
sql select dd from result3.streamt3 order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop4
endi
if $data00 != col-1 then
print =====data00=$data00
goto loop4
endi
if $data10 != col-2 then
print =====data10=$data10
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result3.streamt3;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop5
endi
$loop_count = 0
loop6:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop6
endi
if $data00 != tbn-1 then
print =====data00=$data00
goto loop6
endi
if $data10 != tbn-2 then
print =====data10=$data10
goto loop6
endi
print ===== step5
print ===== tag name + table name
sql create database result4 vgroups 1;
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s);
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop7
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop7
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop7
endi
if $data20 != tbn-t3 then
print =====data20=$data20
goto loop7
endi
$loop_count = 0
loop8:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result4.streamt4 order by 3;
if $rows != 3 then
print =====rows=$rows
print $data00 $data10
goto loop8
endi
if $data01 != 1 then
print =====data01=$data01
goto loop8
endi
if $data02 != t1 then
print =====data02=$data02
goto loop8
endi
if $data11 != 1 then
print =====data11=$data11
goto loop8
endi
if $data12 != t2 then
print =====data12=$data12
goto loop8
endi
if $data21 != 1 then
print =====data21=$data21
goto loop8
endi
if $data22 != t3 then
print =====data22=$data22
goto loop8
endi
print ======over
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册