From d00dd1673b4187c5f4db41c89bd394373ddf04a1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 30 Jul 2022 11:33:39 +0800 Subject: [PATCH] fix test cases --- tests/system-test/7-tmq/tmqConsFromTsdb.py | 50 +++---- tests/system-test/7-tmq/tmqConsumerGroup.py | 18 +-- tests/system-test/7-tmq/tmqDelete-1ctb.py | 132 ++++++++--------- tests/system-test/7-tmq/tmqDelete-multiCtb.py | 134 +++++++++--------- tests/system-test/7-tmq/tmqDnode.py | 54 +++---- tests/system-test/7-tmq/tmqDnodeRestart.py | 30 ++-- .../system-test/7-tmq/tmqDropNtb-snapshot0.py | 62 ++++---- 7 files changed, 240 insertions(+), 240 deletions(-) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb.py b/tests/system-test/7-tmq/tmqConsFromTsdb.py index a4a242365a..b3bb5f84e4 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 10 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -94,18 +94,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -120,18 +120,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - tmqCom.checkFileContent(consumerId, queryString) + tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -162,18 +162,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -189,15 +189,15 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + firstConsumeRows = resultList[0] # reinit consume info, and start tmq_sim, then check consume result @@ -205,22 +205,22 @@ class TDTestCase: consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = firstConsumeRows + resultList[0] - + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): - tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsumerGroup.py b/tests/system-test/7-tmq/tmqConsumerGroup.py index bfd63fd4a2..b5cbfb8a51 100644 --- a/tests/system-test/7-tmq/tmqConsumerGroup.py +++ b/tests/system-test/7-tmq/tmqConsumerGroup.py @@ -27,26 +27,26 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) else: break - return + return def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -78,7 +78,7 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") # queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName']) @@ -114,7 +114,7 @@ class TDTestCase: ifManualCommit = 1 keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) @@ -132,7 +132,7 @@ class TDTestCase: pThread.join() - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actTotalRows = 0 @@ -140,7 +140,7 @@ class TDTestCase: actTotalRows += resultList[i] tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) + expectRowsList.append(tdSql.getRows()) expectTotalRows = 0 for i in range(len(expectRowsList)): expectTotalRows += expectRowsList[i] @@ -150,7 +150,7 @@ class TDTestCase: tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows)) tdLog.exit("0 tmq consume rows error!") - # time.sleep(10) + # time.sleep(10) # for i in range(len(topicNameList)): # tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py index bedb36e505..8329b00145 100644 --- a/tests/system-test/7-tmq/tmqDelete-1ctb.py +++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") @@ -65,11 +65,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def delData(self,tsql,dbName,ctbPrefix,ctbNum,startTs=0,endTs=0,ctbStartIdx=0): tdLog.debug("start to del data ............") for i in range(ctbNum): @@ -84,12 +84,12 @@ class TDTestCase: newTdSql = tdCom.newTdSql() self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"]) return - + def asyncDeleteData(self, paraDict): pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict) pThread.start() return pThread - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', @@ -116,29 +116,29 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - # del some data + + # del some data rowsOfDelete = int(paraDict["rowsPerTbl"] / 4) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 - self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: consumerId = 1 rowsOfDelete = 0 - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) topicList = topicFromStb1 ifcheckdata = 1 @@ -161,23 +161,23 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act query rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsFromQuery)) - + if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error with snapshot = 1!") - - tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete) + + tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -197,50 +197,50 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to 1/4 rows and insert 3/4 new rows paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4) # paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # del some data rowsOfDelete = int(self.rowsPerTbl / 4 ) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 - self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 1 - + consumerId = 1 + if self.snapshot == 0: - consumerId = 2 + consumerId = 2 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) elif self.snapshot == 1: consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -252,7 +252,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -262,24 +262,24 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) - + if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error with snapshot = 1!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 2 end ...... ") def tmqCase3(self): - tdLog.printNoPrefix("======== test case 3: ") + tdLog.printNoPrefix("======== test case 3: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -299,34 +299,34 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 1 - + consumerId = 1 + if self.snapshot == 0: - consumerId = 4 + consumerId = 4 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) elif self.snapshot == 1: consumerId = 5 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -340,20 +340,20 @@ class TDTestCase: rowsOfDelete = int(self.rowsPerTbl / 4 ) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 pDeleteThread = self.asyncDeleteData(paraDict) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + # update to 1/4 rows and insert 3/4 new rows paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4) # paraDict['rowsPerTbl'] = self.rowsPerTbl # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) pInsertThread.join() - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -363,17 +363,17 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) - + if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 1!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -385,17 +385,17 @@ class TDTestCase: tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 - self.prepareTestEnv() + self.prepareTestEnv() self.tmqCase1() - self.tmqCase2() - + self.tmqCase2() + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.prepareTestEnv() self.tmqCase1() - self.tmqCase2() - + self.tmqCase2() + tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 @@ -404,7 +404,7 @@ class TDTestCase: tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 - self.prepareTestEnv() + self.prepareTestEnv() self.tmqCase3() def stop(self): diff --git a/tests/system-test/7-tmq/tmqDelete-multiCtb.py b/tests/system-test/7-tmq/tmqDelete-multiCtb.py index 94ca16bc6f..e59040305a 100644 --- a/tests/system-test/7-tmq/tmqDelete-multiCtb.py +++ b/tests/system-test/7-tmq/tmqDelete-multiCtb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") @@ -65,11 +65,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def delData(self,tsql,dbName,ctbPrefix,ctbNum,startTs=0,endTs=0,ctbStartIdx=0): tdLog.debug("start to del data ............") for i in range(ctbNum): @@ -84,12 +84,12 @@ class TDTestCase: newTdSql = tdCom.newTdSql() self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"]) return - + def asyncDeleteData(self, paraDict): pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict) pThread.start() return pThread - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', @@ -116,29 +116,29 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - # del some data + + # del some data rowsOfDelete = int(paraDict["rowsPerTbl"] / 4) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 - self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: consumerId = 1 rowsOfDelete = 0 - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) topicList = topicFromStb1 ifcheckdata = 1 @@ -161,23 +161,23 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d, act query rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsFromQuery)) - + if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error with snapshot = 1!") - - # tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete) + + # tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -197,50 +197,50 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to 1/4 rows and insert 3/4 new rows paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4) # paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # del some data rowsOfDelete = int(self.rowsPerTbl / 4 ) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 - self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 1 - + consumerId = 1 + if self.snapshot == 0: - consumerId = 2 + consumerId = 2 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) elif self.snapshot == 1: consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -252,7 +252,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -262,24 +262,24 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) - + if self.snapshot == 0: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error with snapshot = 1!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 2 end ...... ") def tmqCase3(self): - tdLog.printNoPrefix("======== test case 3: ") + tdLog.printNoPrefix("======== test case 3: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -299,34 +299,34 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) - + tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - consumerId = 1 - + consumerId = 1 + if self.snapshot == 0: - consumerId = 4 + consumerId = 4 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) elif self.snapshot == 1: consumerId = 5 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) - + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -340,20 +340,20 @@ class TDTestCase: rowsOfDelete = int(self.rowsPerTbl / 4 ) paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 pDeleteThread = self.asyncDeleteData(paraDict) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + # update to 1/4 rows and insert 3/4 new rows paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4) # paraDict['rowsPerTbl'] = self.rowsPerTbl # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) pInsertThread.join() - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -363,47 +363,47 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) - + if self.snapshot == 0: if totalConsumeRows < expectrowcnt: tdLog.exit("tmq consume rows error with snapshot = 0!") elif self.snapshot == 1: if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 1!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 3 end ...... ") - def run(self): + def run(self): tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 self.prepareTestEnv() self.tmqCase1() - self.tmqCase2() - + self.tmqCase2() + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.prepareTestEnv() self.tmqCase1() self.tmqCase2() - + tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 - self.prepareTestEnv() - self.tmqCase3() + self.prepareTestEnv() + self.tmqCase3() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 - self.prepareTestEnv() + self.prepareTestEnv() self.tmqCase3() def stop(self): diff --git a/tests/system-test/7-tmq/tmqDnode.py b/tests/system-test/7-tmq/tmqDnode.py index 235e9ef971..802993e924 100644 --- a/tests/system-test/7-tmq/tmqDnode.py +++ b/tests/system-test/7-tmq/tmqDnode.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -98,9 +98,9 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -130,7 +130,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: for i in range(ctbNum): sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) @@ -176,7 +176,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(ctbPrefix,i) for j in range(rowsPerTbl): @@ -207,7 +207,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) for j in range(rowsPerTbl): @@ -226,8 +226,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -246,8 +246,8 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") - + tdLog.printNoPrefix("======== test case 1: ") + self.initConsumerTable() # create and start thread @@ -264,7 +264,7 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbPrefix"], parameterDict["ctbNum"]) @@ -272,7 +272,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -303,7 +303,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -313,8 +313,8 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: ") - + tdLog.printNoPrefix("======== test case 2: ") + self.initConsumerTable() # create and start thread @@ -339,7 +339,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2 @@ -373,7 +373,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -387,8 +387,8 @@ class TDTestCase: # 自动建表完成数据插入,启动消费 def tmqCase3(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 3: ") - + tdLog.printNoPrefix("======== test case 3: ") + self.initConsumerTable() # create and start thread @@ -414,7 +414,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -444,7 +444,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -466,7 +466,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) # self.tmqCase1(cfgPath, buildPath) - # self.tmqCase2(cfgPath, buildPath) + # self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) # self.tmqCase4(cfgPath, buildPath) # self.tmqCase5(cfgPath, buildPath) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 9a11106e3e..1902945bf6 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 2 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -51,7 +51,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") @@ -63,7 +63,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdLog.info("create stb") @@ -110,12 +110,12 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 @@ -129,7 +129,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + # time.sleep(3) tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("================= restart dnode ===========================") @@ -146,7 +146,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQury = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) if not (totalConsumeRows == totalRowsFromQury): tdLog.exit("tmq consume rows error!") @@ -157,7 +157,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -182,7 +182,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdLog.info("create stb") @@ -195,12 +195,12 @@ class TDTestCase: # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 1 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 @@ -225,7 +225,7 @@ class TDTestCase: paraDict["batchNum"] = 100 paraDict["ctbPrefix"] = 'newCtb' tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -235,7 +235,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") @@ -249,7 +249,7 @@ class TDTestCase: # tdSql.prepare() self.prepareTestEnv() self.tmqCase1() - self.tmqCase2() + self.tmqCase2() def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py index 650d918828..b952dc2d57 100644 --- a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py @@ -20,11 +20,11 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1000 self.rowsPerTbl = 10 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) - + # drop some ntbs def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -51,8 +51,8 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -60,11 +60,11 @@ class TDTestCase: tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: @@ -83,13 +83,13 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -98,19 +98,19 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - - tdLog.info("wait subscriptions exit ....") + + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 1 end ...... ") - - + + # drop some ntbs and create some new ntbs def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: ") @@ -137,8 +137,8 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -146,11 +146,11 @@ class TDTestCase: tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 2 elif self.snapshot == 1: @@ -169,20 +169,20 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) - + tdLog.info("start create some new normal tables....") paraDict["ctbPrefix"] = 'newCtb' paraDict["ctbNum"] = self.ctbNum tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into these new normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -191,24 +191,24 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - - tdLog.info("wait subscriptions exit ....") + + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 2 end ...... ") - - def run(self): + + def run(self): tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 self.tmqCase1() - self.tmqCase2() - + self.tmqCase2() + # tdLog.printNoPrefix("====================================================================") # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") # self.snapshot = 1 -- GitLab