diff --git a/tests/system-test/7-tmq/tmqUpdate-1ctb.py b/tests/system-test/7-tmq/tmqUpdate-1ctb.py index 5d891bdedfd5763da3de8fff46e6080e93071aa5..12de04cd9c525e4ae8bc9aaa70d45942e7c3c0e2 100644 --- a/tests/system-test/7-tmq/tmqUpdate-1ctb.py +++ b/tests/system-test/7-tmq/tmqUpdate-1ctb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -65,8 +65,8 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", @@ -104,24 +104,24 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 - + if self.snapshot == 0: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -143,18 +143,18 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - tmqCom.checkFileContent(consumerId, queryString) + + tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -174,15 +174,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -191,16 +191,16 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 1 @@ -208,7 +208,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -220,7 +220,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -230,13 +230,13 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - + if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -249,14 +249,14 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot0.py b/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot0.py index 3b6ae65316ecdac40c085b55681f9f9b9f9fe381..02641d8bcb0603c7e2caf3e2486a577e62176269 100644 --- a/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot0.py +++ b/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot0.py @@ -21,7 +21,7 @@ class TDTestCase: self.ctbNum = 50 self.rowsPerTbl = 1000 self.autoCtbPrefix = 'aCtb' - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -51,7 +51,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -66,8 +66,8 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -105,15 +105,15 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 @@ -121,7 +121,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -143,18 +143,18 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -174,15 +174,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -190,23 +190,23 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -215,7 +215,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -227,7 +227,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -237,13 +237,13 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - + if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -256,14 +256,14 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + # self.prepareTestEnv() # tdLog.printNoPrefix("====================================================================") # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") # self.snapshot = 1 # self.tmqCase1() # self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot1.py b/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot1.py index 0ac897c2cd4622d02dd2832b0854ba5b28e74315..baeb70e65680aa731d9db4eb28385b24e67d9a47 100644 --- a/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot1.py +++ b/tests/system-test/7-tmq/tmqUpdate-multiCtb-snapshot1.py @@ -21,7 +21,7 @@ class TDTestCase: self.ctbNum = 50 self.rowsPerTbl = 1000 self.autoCtbPrefix = 'aCtb' - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -51,7 +51,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -66,8 +66,8 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -105,15 +105,15 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 @@ -121,7 +121,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -143,18 +143,18 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -174,15 +174,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -190,23 +190,23 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -215,7 +215,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -227,7 +227,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -237,13 +237,13 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - + if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -256,14 +256,14 @@ class TDTestCase: # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # self.tmqCase1() # self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqUpdate-multiCtb.py b/tests/system-test/7-tmq/tmqUpdate-multiCtb.py index 892137be43a39ea0889143fd679ec38f20489928..bde266e63401f27e1ff9bfb1085fc26636e0bc77 100644 --- a/tests/system-test/7-tmq/tmqUpdate-multiCtb.py +++ b/tests/system-test/7-tmq/tmqUpdate-multiCtb.py @@ -21,7 +21,7 @@ class TDTestCase: self.ctbNum = 50 self.rowsPerTbl = 1000 self.autoCtbPrefix = 'aCtb' - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -51,7 +51,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -66,8 +66,8 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -105,15 +105,15 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 @@ -121,7 +121,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -143,18 +143,18 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -174,15 +174,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -190,23 +190,23 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) - + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -215,7 +215,7 @@ class TDTestCase: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -227,7 +227,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -237,13 +237,13 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - + if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -256,14 +256,14 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqUpdateWithConsume.py b/tests/system-test/7-tmq/tmqUpdateWithConsume.py index 2dd3a061c62bf5ac63a135e99424ac549ef83a54..be07ba13a978a8705f014b04ee7c09ba37e3a33b 100644 --- a/tests/system-test/7-tmq/tmqUpdateWithConsume.py +++ b/tests/system-test/7-tmq/tmqUpdateWithConsume.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") @@ -65,8 +65,8 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -94,30 +94,30 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + # update to half tables paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 - + if self.snapshot == 0: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2 + 1)) elif self.snapshot == 1: expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -143,16 +143,16 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") elif self.snapshot == 1: if not (totalConsumeRows < expectrowcnt and totalConsumeRows >= totalRowsInserted): - tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -162,7 +162,7 @@ class TDTestCase: self.ctbNum = 1 self.snapshot = 0 self.prepareTestEnv() - self.tmqCase1() + self.tmqCase1() tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.prepareTestEnv() @@ -178,7 +178,7 @@ class TDTestCase: self.prepareTestEnv() self.snapshot = 1 self.tmqCase1() - + def stop(self): tdSql.close()