#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 #basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics #basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics #basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics #basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # run tsim/tmq/prepareBasicEnv-4vgrp.sim #---- global parameters start ----# $dbName = db $vgroups = 4 $stbPrefix = stb $ctbPrefix = ctb $ntbPrefix = ntb $stbNum = 1 $ctbNum = 10 $ntbNum = 10 $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# $pullDelay = 5 $ifcheckdata = 1 $ifmanualcommit = 1 $showMsg = 1 $showRow = 0 sql connect sql use $dbName print == alter database sql alter database $dbName wal_retention_period 3600 print == create topics from super table sql create topic topic_stb_column as select ts, c3 from stb sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb print == create topics from child table sql create topic topic_ctb_column as select ts, c3 from ctb0 sql create topic topic_ctb_all as select * from ctb0 sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 print == create topics from normal table sql create topic topic_ntb_column as select ts, c3 from ntb0 sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 #sql show topics #if $rows != 9 then # return -1 #endi #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest' $keyList = ' . group.id:cgrp1 $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , #$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList $cdb_index = 0 #=============================== start consume =============================# print ================ test consume from stb $loop_cnt = 0 loop_consume_diff_topic_from_stb: ####################################################################################### # clear consume info and consume result #run tsim/tmq/clearConsume.sim # because drop table function no stable, so by create new db for consume info and result. Modify it later $cdb_index = $cdb_index + 1 $cdbName = cdb . $cdb_index sql create database $cdbName vgroups 1 sleep 500 sql use $cdbName print == alter database sql alter database $cdbName wal_retention_period 3600 print == create consume info table and consume result table sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql show tables if $rows != 2 then return -1 endi ####################################################################################### if $loop_cnt == 0 then print == scenario 1: topic_stb_column $topicList = ' . topic_stb_column $topicList = $topicList . ' elif $loop_cnt == 1 then print == scenario 2: topic_stb_all $topicList = ' . topic_stb_all $topicList = $topicList . ' elif $loop_cnt == 2 then print == scenario 3: topic_stb_function $topicList = ' . topic_stb_function $topicList = $topicList . ' else goto loop_consume_diff_topic_from_stb_end endi $consumerId = 0 $totalMsgOfStb = $ctbNum * $rowsPerCtb $expectmsgcnt = $totalMsgOfStb sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from stb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == check consume result wait_consumer_end_from_stb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_stb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[0][1] == 1 then if $data[1][1] != 0 then return -1 endi endi if $data[0][2] <= 0 then return -1 endi if $data[0][2] >= $expectmsgcnt then return -1 endi if $data[1][2] <= 0 then return -1 endi if $data[1][2] >= $expectmsgcnt then return -1 endi $sumOfMsgCnt = $data[0][2] + $data[1][2] if $sumOfMsgCnt != $expectmsgcnt then return -1 endi if $data[0][3] <= 0 then return -1 endi if $data[0][3] >= $expectmsgcnt then return -1 endi if $data[1][3] <= 0 then return -1 endi if $data[1][3] >= $expectmsgcnt then return -1 endi $sumOfMsgRows = $data[0][3] + $data[1][3] if $sumOfMsgRows != $expectmsgcnt then return -1 endi $loop_cnt = $loop_cnt + 1 goto loop_consume_diff_topic_from_stb loop_consume_diff_topic_from_stb_end: print ================ test consume from ctb $loop_cnt = 0 loop_consume_diff_topic_from_ctb: ####################################################################################### # clear consume info and consume result #run tsim/tmq/clearConsume.sim # because drop table function no stable, so by create new db for consume info and result. Modify it later $cdb_index = $cdb_index + 1 $cdbName = cdb . $cdb_index sql create database $cdbName vgroups 1 sleep 500 sql use $cdbName print == alter database sql alter database $cdbName wal_retention_period 3600 print == create consume info table and consume result table sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql show tables if $rows != 2 then return -1 endi ####################################################################################### if $loop_cnt == 0 then print == scenario 1: topic_ctb_column $topicList = ' . topic_ctb_column $topicList = $topicList . ' elif $loop_cnt == 1 then print == scenario 2: topic_ctb_all $topicList = ' . topic_ctb_all $topicList = $topicList . ' elif $loop_cnt == 2 then print == scenario 3: topic_ctb_function $topicList = ' . topic_ctb_function $topicList = $topicList . ' else goto loop_consume_diff_topic_from_ctb_end endi $consumerId = 0 $totalMsgOfCtb = $rowsPerCtb $expectmsgcnt = $totalMsgOfCtb sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ctb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == check consume result wait_consumer_end_from_ctb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_ctb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[0][1] == 1 then if $data[1][1] != 0 then return -1 endi endi # either $data[0][2] == $totalMsgOfCtb and $data[1][2] == 0 # or $data[0][2] == 0 and $data[1][2] == $totalMsgOfCtb if $data[0][2] == $totalMsgOfCtb then if $data[1][2] == 0 then goto check_ok_0 endi elif $data[1][2] == $totalMsgOfCtb then if $data[0][2] == 0 then goto check_ok_0 endi endi return -1 check_ok_0: if $data[0][3] == $totalMsgOfCtb then if $data[1][3] == 0 then goto check_ok_1 endi elif $data[1][3] == $totalMsgOfCtb then if $data[0][3] == 0 then goto check_ok_1 endi endi return -1 check_ok_1: $loop_cnt = $loop_cnt + 1 goto loop_consume_diff_topic_from_ctb loop_consume_diff_topic_from_ctb_end: print ================ test consume from ntb $loop_cnt = 0 loop_consume_diff_topic_from_ntb: ####################################################################################### # clear consume info and consume result #run tsim/tmq/clearConsume.sim # because drop table function no stable, so by create new db for consume info and result. Modify it later $cdb_index = $cdb_index + 1 $cdbName = cdb . $cdb_index sql create database $cdbName vgroups 1 sleep 500 sql use $cdbName print == alter database sql alter database $cdbName wal_retention_period 3600 print == create consume info table and consume result table sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql show tables if $rows != 2 then return -1 endi ####################################################################################### if $loop_cnt == 0 then print == scenario 1: topic_ntb_column $topicList = ' . topic_ntb_column $topicList = $topicList . ' elif $loop_cnt == 1 then print == scenario 2: topic_ntb_all $topicList = ' . topic_ntb_all $topicList = $topicList . ' elif $loop_cnt == 2 then print == scenario 3: topic_ntb_function $topicList = ' . topic_ntb_function $topicList = $topicList . ' else goto loop_consume_diff_topic_from_ntb_end endi $consumerId = 0 $totalMsgOfNtb = $rowsPerCtb $expectmsgcnt = $totalMsgOfNtb sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) $consumerId = 1 sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ntb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == check consume result from ntb wait_consumer_end_from_ntb: sql select * from consumeresult print ==> rows: $rows print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 goto wait_consumer_end_from_ntb endi if $data[0][1] == 0 then if $data[1][1] != 1 then return -1 endi endi if $data[1][1] == 0 then if $data[0][1] != 1 then return -1 endi endi # either $data[0][2] == $totalMsgOfNtb and $data[1][2] == 0 # or $data[0][2] == 0 and $data[1][2] == $totalMsgOfNtb if $data[0][2] == $totalMsgOfNtb then if $data[1][2] == 0 then goto check_ok_2 endi elif $data[1][2] == $totalMsgOfNtb then if $data[0][2] == 0 then goto check_ok_2 endi endi return -1 check_ok_2: if $data[0][3] == $totalMsgOfNtb then if $data[1][3] == 0 then goto check_ok_3 endi elif $data[1][3] == $totalMsgOfNtb then if $data[0][3] == 0 then goto check_ok_3 endi endi return -1 check_ok_3: $loop_cnt = $loop_cnt + 1 goto loop_consume_diff_topic_from_ntb loop_consume_diff_topic_from_ntb_end: #------ not need stop consumer, because it exit after pull msg overthan expect msg #system tsim/tmq/consume.sh -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT