#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 #basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics #basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics #basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics #basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # run tsim/tmq/prepareBasicEnv-4vgrp.sim #---- global parameters start ----# $dbName = db $vgroups = 4 $stbPrefix = stb $ctbPrefix = ctb $ntbPrefix = ntb $stbNum = 1 $ctbNum = 10 $ntbNum = 10 $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# $pullDelay = 5 $ifcheckdata = 1 $ifmanualcommit = 1 $showMsg = 1 $showRow = 0 sql connect sql use $dbName print == alter database print == create topics from super table sql create topic topic_stb_column as select ts, c3 from stb sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb print == create topics from child table sql create topic topic_ctb_column as select ts, c3 from ctb0 sql create topic topic_ctb_all as select * from ctb0 sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 print == create topics from normal table sql create topic topic_ntb_column as select ts, c3 from ntb0 sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 #sql show topics #if $rows != 9 then # return -1 #endi #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest' $keyList = ' . group.id:cgrp1 $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , #$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList $topicNum = 3 print ================ test consume from stb print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function $topicList = ' . topic_stb_column $topicList = $topicList . , $topicList = $topicList . topic_stb_all $topicList = $topicList . , $topicList = $topicList . topic_stb_function $topicList = $topicList . ' $consumerId = 0 $totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $totalMsgOfStb * $topicNum $expectmsgcnt = 9 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfStb , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfStb , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from stb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == check consume result wait_consumer_end_from_stb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_stb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[0][1] == 1 then if $data[1][1] != 0 then return -1 endi endi if $data[0][2] <= 0 then return -1 endi if $data[0][2] >= $expectmsgcnt then return -1 endi if $data[1][2] <= 0 then return -1 endi if $data[1][2] >= $expectmsgcnt then return -1 endi $sumOfConsMsg = $data[0][2] + $data[1][2] if $sumOfConsMsg != $expectmsgcnt then return -1 endi if $data[0][3] <= 0 then return -1 endi if $data[0][3] >= $totalMsgOfStb then return -1 endi if $data[1][3] <= 0 then return -1 endi if $data[1][3] >= $totalMsgOfStb then return -1 endi $sumOfConsRow = $data[0][3] + $data[1][3] if $sumOfConsRow != $totalMsgOfStb then return -1 endi ####################################################################################### # clear consume info and consume result #run tsim/tmq/clearConsume.sim # because drop table function no stable, so by create new db for consume info and result. Modify it later $cdbName = cdb1 sql create database $cdbName vgroups 1 sleep 500 sql use $cdbName print == alter database print == create consume info table and consume result table sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql show tables if $rows != 2 then return -1 endi ####################################################################################### print ================ test consume from ctb print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function $topicList = ' . topic_ctb_column $topicList = $topicList . , $topicList = $topicList . topic_ctb_all $topicList = $topicList . , $topicList = $topicList . topic_ctb_function $topicList = $topicList . ' $consumerId = 0 $totalMsgOfCtb = $rowsPerCtb * $topicNum $expectmsgcnt = $topicNum sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfCtb , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfCtb , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ctb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == check consume result wait_consumer_end_from_ctb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_ctb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[0][1] == 1 then if $data[1][1] != 0 then return -1 endi endi # either $data[0][2] == $expectmsgcnt and $data[1][2] == 0 # or $data[0][2] == 0 and $data[1][2] == $expectmsgcnt if $data[0][2] == $expectmsgcnt then if $data[1][2] == 0 then goto check_ok_0 endi elif $data[0][2] == 0 then if $data[1][2] == $expectmsgcnt then goto check_ok_0 endi endi return -1 check_ok_0: if $data[0][3] == $totalMsgOfCtb then if $data[1][3] == 0 then goto check_ok_1 endi elif $data[0][3] == 0 then if $data[1][3] == $totalMsgOfCtb then goto check_ok_1 endi endi return -1 check_ok_1: ####################################################################################### # clear consume info and consume result #run tsim/tmq/clearConsume.sim # because drop table function no stable, so by create new db for consume info and result. Modify it later $cdbName = cdb2 sql create database $cdbName vgroups 1 sleep 500 sql use $cdbName print == alter database print == create consume info table and consume result table sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql show tables if $rows != 2 then return -1 endi ####################################################################################### print ================ test consume from ntb print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function $topicList = ' . topic_ntb_column $topicList = $topicList . , $topicList = $topicList . topic_ntb_all $topicList = $topicList . , $topicList = $topicList . topic_ntb_function $topicList = $topicList . ' $consumerId = 0 $totalMsgOfNtb = $rowsPerCtb * $topicNum $expectmsgcnt = $topicNum sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfNtb , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfNtb , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ntb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == check consume result from ntb wait_consumer_end_from_ntb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_ntb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[1][1] == 0 then if $data[0][1] != 1 then return -1 endi endi # either $data[0][2] == $expectmsgcnt and $data[1][2] == 0 # or $data[0][2] == 0 and $data[1][2] == $expectmsgcnt if $data[0][2] == $expectmsgcnt then if $data[1][2] == 0 then goto check_ok_2 endi elif $data[0][2] == 0 then if $data[1][2] == $expectmsgcnt then goto check_ok_2 endi endi return -1 check_ok_2: if $data[0][3] == $totalMsgOfNtb then if $data[1][3] == 0 then goto check_ok_3 endi elif $data[0][3] == 0 then if $data[1][3] == $totalMsgOfNtb then goto check_ok_3 endi endi return -1 check_ok_3: #------ not need stop consumer, because it exit after pull msg overthan expect msg #system tsim/tmq/consume.sh -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT