diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index b8d3abca5c1cb8828777ff434c82ec8f3b659e30..8f5a7937149010e8862637e9396a285f1ccedc66 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -49,7 +49,38 @@ class TDTestCase: print(cur) return cur - def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind=0): + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): shellCmd = 'nohup ' if valgrind == 1: logFile = cfgPath + '/../log/valgrind-tmq.log' @@ -58,7 +89,7 @@ class TDTestCase: shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -87,6 +118,8 @@ class TDTestCase: pre_insert = "insert into " sql = pre_insert + t = time.time() + startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) @@ -127,7 +160,7 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db") + tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db, inclue 1 stb") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -135,11 +168,13 @@ class TDTestCase: 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 100000, \ - 'batchNum': 200, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath + self.initConsumerTable() + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -149,23 +184,16 @@ class TDTestCase: topicName1 = 'topic_db1' tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - - tdLog.info("create consume info table and consume result table") - cdbName = parameterDict["dbName"] - tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) - tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 ifcheckdata = 0 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - sql = "insert into %s.consumeinfo values "%cdbName - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) event.wait() @@ -173,32 +201,28 @@ class TDTestCase: pollDelay = 5 showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName) + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready prepareEnvThread.join() tdLog.info("insert process end, and start to check consume result") - while 1: - tdSql.query("select * from %s.consumeresult"%cdbName) - #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) - if tdSql.getRows() == 1: - break - else: - time.sleep(5) - - tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) - tdSql.checkData(0 , 1, consumerId) - # mulit rows and mulit tables in one sql, this num of msg is not sure - #tdSql.checkData(0 , 2, expectmsgcnt) - tdSql.checkData(0 , 3, expectrowcnt+1) + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicName1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db") + tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db, inclue 1 stb") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -206,11 +230,13 @@ class TDTestCase: 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 100000, \ + 'rowsPerTbl': 10000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath + self.initConsumerTable() + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -221,27 +247,19 @@ class TDTestCase: tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - tdLog.info("create consume info table and consume result table") - cdbName = parameterDict["dbName"] - tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) - tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 ifcheckdata = 0 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - sql = "insert into %s.consumeinfo values "%cdbName - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) - + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + consumerId = 1 - sql = "insert into %s.consumeinfo values "%cdbName - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) event.wait() @@ -249,30 +267,20 @@ class TDTestCase: pollDelay = 5 showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName) + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready prepareEnvThread.join() tdLog.info("insert process end, and start to check consume result") - while 1: - tdSql.query("select * from %s.consumeresult"%cdbName) - #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) - if tdSql.getRows() == 2: - break - else: - time.sleep(5) - - consumerId0 = tdSql.getData(0 , 1) - consumerId1 = tdSql.getData(1 , 1) - actConsumeRows0 = tdSql.getData(0 , 3) - actConsumeRows1 = tdSql.getData(1 , 3) - - tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) - tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) - - totalConsumeRows = actConsumeRows0 + actConsumeRows1 - if totalConsumeRows != expectrowcnt + 2: + expectRows = 2 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicName1) @@ -288,11 +296,13 @@ class TDTestCase: 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 100000, \ + 'rowsPerTbl': 10000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath + self.initConsumerTable() + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -303,7 +313,7 @@ class TDTestCase: 'vgroups': 4, \ 'stbName': 'stb2', \ 'ctbNum': 10, \ - 'rowsPerTbl': 100000, \ + 'rowsPerTbl': 10000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -316,64 +326,376 @@ class TDTestCase: tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - tdLog.info("create consume info table and consume result table") - cdbName = parameterDict["dbName"] - tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) - tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 ifcheckdata = 0 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - sql = "insert into %s.consumeinfo values "%cdbName - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) # consumerId = 1 - # sql = "insert into %s.consumeinfo values "%cdbName - # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - # tdSql.query(sql) + # self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) event.wait() tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 - showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName) + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready prepareEnvThread.join() prepareEnvThread2.join() tdLog.info("insert process end, and start to check consume result") - while 1: - tdSql.query("select * from %s.consumeresult"%cdbName) - #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) - if tdSql.getRows() == 1: - break - else: - time.sleep(5) + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + def tmqCase4(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 4: Produce while two consumers to subscribe one db, include 2 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db4', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db4', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() - consumerId0 = tdSql.getData(0 , 1) - #consumerId1 = tdSql.getData(1 , 1) - actConsumeRows0 = tdSql.getData(0 , 3) - #actConsumeRows1 = tdSql.getData(1 , 3) + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + consumerId = 1 + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() - tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) - #tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - #totalConsumeRows = actConsumeRows0 + actConsumeRows1 - if actConsumeRows0 != expectrowcnt + 1: + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 2 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicName1) - tdLog.printNoPrefix("======== test case 3 end ...... ") + tdLog.printNoPrefix("======== test case 4 end ...... ") + + def tmqCase5(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 5: Produce while two consumers to subscribe one db, firstly create one stb, after start consume create other stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db5', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db5', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + consumerId = 1 + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 2 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 5 end ...... ") + + def tmqCase6(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 6: Produce while one consumers to subscribe tow topic, Each contains one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db60', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db61', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db60' + topicName2 = 'topic_db61' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName'])) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ',' + topicName2 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + #consumerId = 1 + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + tdSql.query("drop topic %s"%topicName2) + + tdLog.printNoPrefix("======== test case 6 end ...... ") + + def tmqCase7(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 7: Produce while two consumers to subscribe tow topic, Each contains one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db60', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db61', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db60' + topicName2 = 'topic_db61' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName'])) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ',' + topicName2 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + consumerId = 1 + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 2 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + tdSql.query("drop topic %s"%topicName2) + + tdLog.printNoPrefix("======== test case 7 end ...... ") def run(self): tdSql.prepare() @@ -386,9 +708,14 @@ class TDTestCase: cfgPath = buildPath + "/../sim/psim/cfg" tdLog.info("cfgPath: %s" % cfgPath) - #self.tmqCase1(cfgPath, buildPath) + self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) - #self.tmqCase3(cfgPath, buildPath) + self.tmqCase3(cfgPath, buildPath) + self.tmqCase4(cfgPath, buildPath) + self.tmqCase5(cfgPath, buildPath) + self.tmqCase6(cfgPath, buildPath) + self.tmqCase7(cfgPath, buildPath) + def stop(self): tdSql.close()