diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index e44f327995facc02ca01aab6e92e9e41ecac36f4..4ed3be967eb1611b3cddd967b5d141ef770e19d3 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -192,7 +192,7 @@ class TDTestCase: time.sleep(1) tdLog.info("start consume processor") - pollDelay = 100 + pollDelay = 20 showMsg = 1 showRow = 1 @@ -208,7 +208,7 @@ class TDTestCase: os.system(shellCmd) # wait for data ready - prepareEnvThread.join() + # prepareEnvThread.join() tdLog.info("insert process end, and start to check consume result") while 1: diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index c9f256ed7495850bd1a88b9a6690f23a85cb68fa..4e8fb04517dc228329235994174c482d59868383 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -322,176 +322,6 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 5 end ...... ") - def tmqCase6(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 6: Produce while one consumers to subscribe tow topic, Each contains one db") - tdLog.info("step 1: create database, stb, ctb and insert data") - # create and start thread - parameterDict = {'cfg': '', \ - 'dbName': 'db60', \ - 'vgroups': 4, \ - 'stbName': 'stb', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 5000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath - - self.initConsumerTable() - - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) - - prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) - prepareEnvThread.start() - - parameterDict2 = {'cfg': '', \ - 'dbName': 'db61', \ - 'vgroups': 4, \ - 'stbName': 'stb2', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 5000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath - - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) - - prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) - prepareEnvThread2.start() - - tdLog.info("create topics from db") - topicName1 = 'topic_db60' - topicName2 = 'topic_db61' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - - consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] - topicList = topicName1 + ',' + topicName2 - ifcheckdata = 0 - ifManualCommit = 0 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:false,\ - auto.commit.interval.ms:6000,\ - auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - #consumerId = 1 - #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - event.wait() - - tdLog.info("start consume processor") - pollDelay = 100 - showMsg = 1 - showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - # wait for data ready - prepareEnvThread.join() - prepareEnvThread2.join() - - tdLog.info("insert process end, and start to check consume result") - expectRows = 1 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - tdLog.exit("tmq consume rows error!") - - tdSql.query("drop topic %s"%topicName1) - tdSql.query("drop topic %s"%topicName2) - - tdLog.printNoPrefix("======== test case 6 end ...... ") - - def tmqCase7(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 7: Produce while two consumers to subscribe tow topic, Each contains one db") - tdLog.info("step 1: create database, stb, ctb and insert data") - # create and start thread - parameterDict = {'cfg': '', \ - 'dbName': 'db70', \ - 'vgroups': 4, \ - 'stbName': 'stb', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 5000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath - - self.initConsumerTable() - - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) - - prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) - prepareEnvThread.start() - - parameterDict2 = {'cfg': '', \ - 'dbName': 'db71', \ - 'vgroups': 4, \ - 'stbName': 'stb2', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 5000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath - - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) - - prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) - prepareEnvThread2.start() - - tdLog.info("create topics from db") - topicName1 = 'topic_db60' - topicName2 = 'topic_db61' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - - consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] - topicList = topicName1 + ',' + topicName2 - ifcheckdata = 0 - ifManualCommit = 1 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:false,\ - auto.commit.interval.ms:6000,\ - auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - consumerId = 1 - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - event.wait() - - tdLog.info("start consume processor") - pollDelay = 100 - showMsg = 1 - showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - # wait for data ready - prepareEnvThread.join() - prepareEnvThread2.join() - - tdLog.info("insert process end, and start to check consume result") - expectRows = 2 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - tdLog.exit("tmq consume rows error!") - - tdSql.query("drop topic %s"%topicName1) - tdSql.query("drop topic %s"%topicName2) - - tdLog.printNoPrefix("======== test case 7 end ...... ") - def run(self): tdSql.prepare() @@ -505,8 +335,6 @@ class TDTestCase: self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath) - self.tmqCase6(cfgPath, buildPath) - self.tmqCase7(cfgPath, buildPath) def stop(self): diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index c08c7d3dae467056010d5437b038ceb83551742b..28a341f8f353cd16120420fa098ca46b1a458edb 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -72,10 +72,10 @@ class TDTestCase: if tdSql.getRows() == expectRows: break else: - time.sleep(5) - + time.sleep(5) + for i in range(expectRows): - tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) return resultList @@ -85,7 +85,7 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) @@ -97,7 +97,7 @@ class TDTestCase: tdLog.info(shellCmd) os.system(shellCmd) - def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum): tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) @@ -151,8 +151,7 @@ class TDTestCase: parameterDict["dbName"],\ parameterDict["vgroups"],\ parameterDict["stbName"],\ - parameterDict["ctbNum"],\ - parameterDict["rowsPerTbl"]) + parameterDict["ctbNum"]) self.insert_data(tsql,\ parameterDict["dbName"],\ @@ -163,16 +162,16 @@ class TDTestCase: parameterDict["startTs"]) return - def tmqCase8(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 8: Produce while one consume to subscribe one db, inclue 1 stb") + def tmqCase6(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 6: Produce while one consumers to subscribe tow topic, Each contains one db") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ - 'dbName': 'db8', \ + 'dbName': 'db60', \ 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 5000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -183,100 +182,43 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - - tdLog.info("create topics from db") - topicName1 = 'topic_db1' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 - topicList = topicName1 - ifcheckdata = 0 - ifManualCommit = 0 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:false,\ - auto.commit.interval.ms:6000,\ - auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - event.wait() - tdLog.info("start consume processor") - pollDelay = 100 - showMsg = 1 - showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - # wait for data ready - prepareEnvThread.join() - - tdLog.info("insert process end, and start to check consume result") - expectRows = 1 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - tdLog.exit("tmq consume rows error!") - - - tdLog.info("again start consume processer") - self.initConsumerTable() - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - expectRows = 1 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - tdLog.exit("tmq consume rows error!") - - tdSql.query("drop topic %s"%topicName1) - - tdLog.printNoPrefix("======== test case 8 end ...... ") - - def tmqCase9(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 9: Produce while one consume to subscribe one db, inclue 1 stb") - tdLog.info("step 1: create database, stb, ctb and insert data") - # create and start thread - parameterDict = {'cfg': '', \ - 'dbName': 'db9', \ + parameterDict2 = {'cfg': '', \ + 'dbName': 'db61', \ 'vgroups': 4, \ - 'stbName': 'stb', \ + 'stbName': 'stb2', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 5000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - self.initConsumerTable() + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() - prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) - prepareEnvThread.start() - tdLog.info("create topics from db") - topicName1 = 'topic_db1' + topicName1 = 'topic_db60' + topicName2 = 'topic_db61' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 - topicList = topicName1 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ',' + topicName2 ifcheckdata = 0 - ifManualCommit = 1 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + #consumerId = 1 + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + event.wait() tdLog.info("start consume processor") @@ -286,7 +228,8 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() + prepareEnvThread2.join() tdLog.info("insert process end, and start to check consume result") expectRows = 1 @@ -295,44 +238,25 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName'])) - countOfStb = tdSql.getData(0,0) - print ("====total rows of stb: %d"%countOfStb) - - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if totalConsumeRows < expectrowcnt: - tdLog.exit("tmq consume rows error!") - - tdLog.info("again start consume processer") - self.initConsumerTable() - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - expectRows = 1 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows2 = 0 - for i in range(expectRows): - totalConsumeRows2 += resultList[i] - - tdLog.info("firstly act consume rows: %d"%(totalConsumeRows)) - tdLog.info("secondly act consume rows: %d, expect consume rows: %d"%(totalConsumeRows2, expectrowcnt)) - if totalConsumeRows + totalConsumeRows2 != expectrowcnt: + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicName1) + tdSql.query("drop topic %s"%topicName2) - tdLog.printNoPrefix("======== test case 9 end ...... ") + tdLog.printNoPrefix("======== test case 6 end ...... ") - def tmqCase10(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 10: Produce while one consume to subscribe one db, inclue 1 stb") + def tmqCase7(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 7: Produce while two consumers to subscribe tow topic, Each contains one db") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ - 'dbName': 'db10', \ + 'dbName': 'db70', \ 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 5000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -343,135 +267,70 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - - tdLog.info("create topics from db") - topicName1 = 'topic_db1' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - topicList = topicName1 - ifcheckdata = 0 - ifManualCommit = 1 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:false,\ - auto.commit.interval.ms:6000,\ - auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - event.wait() - tdLog.info("start consume processor") - pollDelay = 100 - showMsg = 1 - showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - time.sleep(2) - tdLog.info("pkill consume processor") - if (platform.system().lower() == 'windows'): - os.system("TASKKILL /F /IM tmq_sim.exe") - else: - os.system('pkill tmq_sim') - expectRows = 0 - resultList = self.selectConsumeResult(expectRows) - - # wait for data ready - prepareEnvThread.join() - tdLog.info("insert process end, and start to check consume result") - - tdLog.info("again start consume processer") - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - expectRows = 1 - resultList = self.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - tdLog.exit("tmq consume rows error!") - - time.sleep(15) - tdSql.query("drop topic %s"%topicName1) - - tdLog.printNoPrefix("======== test case 10 end ...... ") - - def tmqCase11(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 11: Produce while one consume to subscribe one db, inclue 1 stb") - tdLog.info("step 1: create database, stb, ctb and insert data") - # create and start thread - parameterDict = {'cfg': '', \ - 'dbName': 'db11', \ + parameterDict2 = {'cfg': '', \ + 'dbName': 'db71', \ 'vgroups': 4, \ - 'stbName': 'stb', \ + 'stbName': 'stb2', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 5000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - self.initConsumerTable() + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups'])) - tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() - prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) - prepareEnvThread.start() - tdLog.info("create topics from db") - topicName1 = 'topic_db1' + topicName1 = 'topic_db60' + topicName2 = 'topic_db61' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - topicList = topicName1 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ',' + topicName2 ifcheckdata = 0 ifManualCommit = 1 keyList = 'group.id:cgrp1,\ - enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + consumerId = 1 + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + event.wait() tdLog.info("start consume processor") - pollDelay = 20 + pollDelay = 100 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(6) - tdLog.info("pkill consume processor") - if (platform.system().lower() == 'windows'): - os.system("TASKKILL /F /IM tmq_sim.exe") - else: - os.system('pkill tmq_sim') - expectRows = 0 - resultList = self.selectConsumeResult(expectRows) - # wait for data ready prepareEnvThread.join() + prepareEnvThread2.join() + tdLog.info("insert process end, and start to check consume result") - - tdLog.info("again start consume processer") - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - - expectRows = 1 + expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: - tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") - time.sleep(15) tdSql.query("drop topic %s"%topicName1) + tdSql.query("drop topic %s"%topicName2) - tdLog.printNoPrefix("======== test case 11 end ...... ") + tdLog.printNoPrefix("======== test case 7 end ...... ") def run(self): tdSql.prepare() @@ -484,10 +343,9 @@ class TDTestCase: cfgPath = buildPath + "/../sim/psim/cfg" tdLog.info("cfgPath: %s" % cfgPath) - self.tmqCase8(cfgPath, buildPath) - self.tmqCase9(cfgPath, buildPath) - self.tmqCase10(cfgPath, buildPath) - self.tmqCase11(cfgPath, buildPath) + self.tmqCase6(cfgPath, buildPath) + self.tmqCase7(cfgPath, buildPath) + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/subscribeDb2.py b/tests/system-test/7-tmq/subscribeDb2.py new file mode 100644 index 0000000000000000000000000000000000000000..af31e802b31c6c682a1b120ba01b1e25cf6750f9 --- /dev/null +++ b/tests/system-test/7-tmq/subscribeDb2.py @@ -0,0 +1,347 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + event.set() + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + def tmqCase8(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 8: Produce while one consume to subscribe one db, inclue 1 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db8', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + + tdLog.info("again start consume processer") + self.initConsumerTable() + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 8 end ...... ") + + def tmqCase9(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 9: Produce while one consume to subscribe one db, inclue 1 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db9', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2 + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName'])) + countOfStb = tdSql.getData(0,0) + print ("====total rows of stb: %d"%countOfStb) + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows < expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdLog.info("again start consume processer") + self.initConsumerTable() + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows2 = 0 + for i in range(expectRows): + totalConsumeRows2 += resultList[i] + + tdLog.info("firstly act consume rows: %d"%(totalConsumeRows)) + tdLog.info("secondly act consume rows: %d, expect consume rows: %d"%(totalConsumeRows2, expectrowcnt)) + if totalConsumeRows + totalConsumeRows2 != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 9 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase8(cfgPath, buildPath) + self.tmqCase9(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py new file mode 100644 index 0000000000000000000000000000000000000000..6973f4c51fdc4db250898b15898f23782e613090 --- /dev/null +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -0,0 +1,337 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + event.set() + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + def tmqCase10(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 10: Produce while one consume to subscribe one db, inclue 1 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db10', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + time.sleep(2) + tdLog.info("pkill consume processor") + if (platform.system().lower() == 'windows'): + os.system("TASKKILL /F /IM tmq_sim.exe") + else: + os.system('pkill tmq_sim') + expectRows = 0 + resultList = self.selectConsumeResult(expectRows) + + # wait for data ready + prepareEnvThread.join() + tdLog.info("insert process end, and start to check consume result") + + tdLog.info("again start consume processer") + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + time.sleep(15) + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 10 end ...... ") + + def tmqCase11(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 11: Produce while one consume to subscribe one db, inclue 1 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db11', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 20 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + time.sleep(6) + tdLog.info("pkill consume processor") + if (platform.system().lower() == 'windows'): + os.system("TASKKILL /F /IM tmq_sim.exe") + else: + os.system('pkill tmq_sim') + expectRows = 0 + resultList = self.selectConsumeResult(expectRows) + + # wait for data ready + prepareEnvThread.join() + tdLog.info("insert process end, and start to check consume result") + + tdLog.info("again start consume processer") + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: + tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + time.sleep(15) + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 11 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase10(cfgPath, buildPath) + self.tmqCase11(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())