未验证 提交 3b598ad0 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #12544 from taosdata/test3.0/lihui

Test3.0/lihui
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0 $cdb_index = 0
#=============================== start consume =============================# #=============================== start consume =============================#
...@@ -74,7 +84,7 @@ sleep 500 ...@@ -74,7 +84,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -102,7 +112,7 @@ endi ...@@ -102,7 +112,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -145,7 +155,7 @@ sleep 500 ...@@ -145,7 +155,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -173,7 +183,7 @@ endi ...@@ -173,7 +183,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -216,7 +226,7 @@ sleep 500 ...@@ -216,7 +226,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -244,7 +254,7 @@ endi ...@@ -244,7 +254,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,10 +54,19 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,10 +54,19 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0 $cdb_index = 0
#=============================== start consume =============================# #=============================== start consume =============================#
print ================ test consume from stb print ================ test consume from stb
...@@ -74,7 +84,7 @@ sleep 500 ...@@ -74,7 +84,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for stb print == create consume info table and consume result table for stb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -102,10 +112,10 @@ endi ...@@ -102,10 +112,10 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -177,7 +187,7 @@ sleep 500 ...@@ -177,7 +187,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ctb print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -205,9 +215,9 @@ endi ...@@ -205,9 +215,9 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -279,7 +289,7 @@ sleep 500 ...@@ -279,7 +289,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ntb print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -307,9 +317,9 @@ endi ...@@ -307,9 +317,9 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3 $topicNum = 3
...@@ -74,7 +84,7 @@ $consumerId = 0 ...@@ -74,7 +84,7 @@ $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum $totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -109,7 +119,7 @@ sleep 500 ...@@ -109,7 +119,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -131,7 +141,7 @@ $topicList = $topicList . ' ...@@ -131,7 +141,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum $totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -166,7 +176,7 @@ sleep 500 ...@@ -166,7 +176,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -188,7 +198,7 @@ $topicList = $topicList . ' ...@@ -188,7 +198,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum $totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3 $topicNum = 3
...@@ -74,9 +83,9 @@ $consumerId = 0 ...@@ -74,9 +83,9 @@ $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum $totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -139,7 +148,7 @@ sleep 500 ...@@ -139,7 +148,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ctb print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -161,9 +170,9 @@ $topicList = $topicList . ' ...@@ -161,9 +170,9 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum $totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -226,7 +235,7 @@ sleep 500 ...@@ -226,7 +235,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ntb print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -248,9 +257,9 @@ $topicList = $topicList . ' ...@@ -248,9 +257,9 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum $totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 2 $topicNum = 2
...@@ -72,7 +81,7 @@ $consumerId = 0 ...@@ -72,7 +81,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_stb_all $topicList = ' . topic_stb_all
...@@ -80,7 +89,7 @@ $topicList = $topicList . , ...@@ -80,7 +89,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_stb_function $topicList = $topicList . topic_stb_function
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -158,7 +167,7 @@ sleep 500 ...@@ -158,7 +167,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ctb print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -179,14 +188,14 @@ $consumerId = 0 ...@@ -179,14 +188,14 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function $topicList = ' . topic_ctb_function
$topicList = $topicList . , $topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all $topicList = $topicList . topic_ctb_all
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -249,7 +258,7 @@ sleep 500 ...@@ -249,7 +258,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ntb print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -270,7 +279,7 @@ $consumerId = 0 ...@@ -270,7 +279,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ntb_function $topicList = ' . topic_ntb_function
...@@ -278,7 +287,7 @@ $topicList = $topicList . , ...@@ -278,7 +287,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all $topicList = $topicList . topic_ntb_all
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0 $cdb_index = 0
#=============================== start consume =============================# #=============================== start consume =============================#
...@@ -74,7 +84,7 @@ sleep 500 ...@@ -74,7 +84,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -102,7 +112,7 @@ endi ...@@ -102,7 +112,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -145,7 +155,7 @@ sleep 500 ...@@ -145,7 +155,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -173,7 +183,7 @@ endi ...@@ -173,7 +183,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -216,7 +226,7 @@ sleep 500 ...@@ -216,7 +226,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -244,7 +254,7 @@ endi ...@@ -244,7 +254,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0 $cdb_index = 0
#=============================== start consume =============================# #=============================== start consume =============================#
...@@ -74,7 +83,7 @@ sleep 500 ...@@ -74,7 +83,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -102,9 +111,9 @@ endi ...@@ -102,9 +111,9 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -189,7 +198,7 @@ sleep 500 ...@@ -189,7 +198,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -217,9 +226,9 @@ endi ...@@ -217,9 +226,9 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -291,7 +300,7 @@ sleep 500 ...@@ -291,7 +300,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -319,9 +328,9 @@ endi ...@@ -319,9 +328,9 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3 $topicNum = 3
...@@ -71,7 +81,7 @@ $consumerId = 0 ...@@ -71,7 +81,7 @@ $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum $totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -106,7 +116,7 @@ sleep 500 ...@@ -106,7 +116,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -128,7 +138,7 @@ $topicList = $topicList . ' ...@@ -128,7 +138,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum $totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -163,7 +173,7 @@ sleep 500 ...@@ -163,7 +173,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -185,7 +195,7 @@ $topicList = $topicList . ' ...@@ -185,7 +195,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum $totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3 $topicNum = 3
...@@ -71,9 +80,9 @@ $consumerId = 0 ...@@ -71,9 +80,9 @@ $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum $totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -148,7 +157,7 @@ sleep 500 ...@@ -148,7 +157,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -170,9 +179,9 @@ $topicList = $topicList . ' ...@@ -170,9 +179,9 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum $totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
...@@ -236,7 +245,7 @@ sleep 500 ...@@ -236,7 +245,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -258,9 +267,9 @@ $topicList = $topicList . ' ...@@ -258,9 +267,9 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum $totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
...@@ -48,7 +48,7 @@ endi ...@@ -48,7 +48,7 @@ endi
sql use $dbName sql use $dbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
......
...@@ -48,7 +48,7 @@ endi ...@@ -48,7 +48,7 @@ endi
sql use $dbName sql use $dbName
print == create consume info table and consume result table print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
......
...@@ -52,7 +52,7 @@ class TDTestCase: ...@@ -52,7 +52,7 @@ class TDTestCase:
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
sql = pre_create sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
...@@ -345,11 +345,11 @@ class TDTestCase: ...@@ -345,11 +345,11 @@ class TDTestCase:
after starting consumer, create ctables ") after starting consumer, create ctables ")
# create and start thread # create and start thread
parameterDict = {'cfg': '', \ parameterDict = {'cfg': '', \
'dbName': 'db2', \ 'dbName': 'db3', \
'vgroups': 1, \ 'vgroups': 1, \
'stbName': 'stb', \ 'stbName': 'stb', \
'ctbNum': 10, \ 'ctbNum': 10, \
'rowsPerTbl': 10000, \ 'rowsPerTbl': 30000, \
'batchNum': 100, \ 'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath parameterDict['cfg'] = cfgPath
...@@ -375,21 +375,32 @@ class TDTestCase: ...@@ -375,21 +375,32 @@ class TDTestCase:
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("create stable2 for the seconde topic")
parameterDict2 = {'cfg': '', \
'dbName': 'db3', \
'vgroups': 1, \
'stbName': 'stb2', \
'ctbNum': 10, \
'rowsPerTbl': 30000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict2['cfg'] = cfgPath
tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName']))
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2' topicFromStb = 'topic_stb_column3'
topicFromCtb = 'topic_ctb_column2' topicFromStb2 = 'topic_stb_column32'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName']))
time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2)) tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb: if topic1 != topicFromStb and topic1 != topicFromStb2:
tdLog.exit("topic error1") tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb: if topic2 != topicFromStb and topic2 != topicFromStb2:
tdLog.exit("topic error2") tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
...@@ -397,10 +408,9 @@ class TDTestCase: ...@@ -397,10 +408,9 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
topicList = topicFromStb topicList = topicFromStb + ',' + topicFromStb2
ifcheckdata = 0 ifcheckdata = 0
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\ enable.auto.commit:false,\
...@@ -432,17 +442,13 @@ class TDTestCase: ...@@ -432,17 +442,13 @@ class TDTestCase:
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# create new child table and insert data # start the second thread to create new child table and insert data
newCtbName = 'newctb' prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) prepareEnvThread2.start()
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
tdSql.execute(sql)
tdLog.debug("insert data into new child table ............ [OK]")
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
...@@ -457,7 +463,7 @@ class TDTestCase: ...@@ -457,7 +463,7 @@ class TDTestCase:
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb) tdSql.query("drop topic %s"%topicFromStb2)
tdLog.printNoPrefix("======== test case 3 end ...... ") tdLog.printNoPrefix("======== test case 3 end ...... ")
...@@ -474,7 +480,7 @@ class TDTestCase: ...@@ -474,7 +480,7 @@ class TDTestCase:
self.tmqCase1(cfgPath, buildPath) self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -37,9 +37,10 @@ typedef struct { ...@@ -37,9 +37,10 @@ typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t autoCommitIntervalMs; // 1000 ms int32_t ifManualCommit;
char autoCommit[8]; // true, false //int32_t autoCommitIntervalMs; // 1000 ms
char autoOffsetRest[16]; // none, earliest, latest //char autoCommit[8]; // true, false
//char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
...@@ -136,9 +137,9 @@ void saveConfigToLogFile() { ...@@ -136,9 +137,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: "); taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
...@@ -232,13 +233,18 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) ...@@ -232,13 +233,18 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
if (0 != g_stConfInfo.showRowFlag) {
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
} }
totalRows++; totalRows++;
} }
...@@ -316,6 +322,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -316,6 +322,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
...@@ -384,7 +392,11 @@ void* consumeThreadFunc(void* param) { ...@@ -384,7 +392,11 @@ void* consumeThreadFunc(void* param) {
loop_consume(pInfo); loop_consume(pInfo);
if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n");
tmq_commit(pInfo->tmq, NULL, 0); tmq_commit(pInfo->tmq, NULL, 0);
}
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
...@@ -470,9 +482,9 @@ int32_t getConsumeInfo() { ...@@ -470,9 +482,9 @@ int32_t getConsumeInfo() {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
// set default value // set default value
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {
...@@ -489,12 +501,8 @@ int32_t getConsumeInfo() { ...@@ -489,12 +501,8 @@ int32_t getConsumeInfo() {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]); g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
} else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
} else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
} }
} }
numOfThread++; numOfThread++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册