提交 98da49f6 编写于 作者: G Ganlin Zhao

fix test cases

上级 b7fae566
...@@ -20,7 +20,7 @@ class TDTestCase: ...@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 1 self.ctbNum = 1
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -65,8 +65,8 @@ class TDTestCase: ...@@ -65,8 +65,8 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -96,7 +96,7 @@ class TDTestCase: ...@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
...@@ -104,24 +104,24 @@ class TDTestCase: ...@@ -104,24 +104,24 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
if self.snapshot == 0: if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -143,18 +143,18 @@ class TDTestCase: ...@@ -143,18 +143,18 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -174,15 +174,15 @@ class TDTestCase: ...@@ -174,15 +174,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk") tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -191,16 +191,16 @@ class TDTestCase: ...@@ -191,16 +191,16 @@ class TDTestCase:
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1 consumerId = 1
...@@ -208,7 +208,7 @@ class TDTestCase: ...@@ -208,7 +208,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -220,7 +220,7 @@ class TDTestCase: ...@@ -220,7 +220,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -230,13 +230,13 @@ class TDTestCase: ...@@ -230,13 +230,13 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -249,14 +249,14 @@ class TDTestCase: ...@@ -249,14 +249,14 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -21,7 +21,7 @@ class TDTestCase: ...@@ -21,7 +21,7 @@ class TDTestCase:
self.ctbNum = 50 self.ctbNum = 50
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
self.autoCtbPrefix = 'aCtb' self.autoCtbPrefix = 'aCtb'
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -51,7 +51,7 @@ class TDTestCase: ...@@ -51,7 +51,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -66,8 +66,8 @@ class TDTestCase: ...@@ -66,8 +66,8 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -96,7 +96,7 @@ class TDTestCase: ...@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -105,15 +105,15 @@ class TDTestCase: ...@@ -105,15 +105,15 @@ class TDTestCase:
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
...@@ -121,7 +121,7 @@ class TDTestCase: ...@@ -121,7 +121,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -143,18 +143,18 @@ class TDTestCase: ...@@ -143,18 +143,18 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -174,15 +174,15 @@ class TDTestCase: ...@@ -174,15 +174,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk") tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -190,23 +190,23 @@ class TDTestCase: ...@@ -190,23 +190,23 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -215,7 +215,7 @@ class TDTestCase: ...@@ -215,7 +215,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -227,7 +227,7 @@ class TDTestCase: ...@@ -227,7 +227,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -237,13 +237,13 @@ class TDTestCase: ...@@ -237,13 +237,13 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -256,14 +256,14 @@ class TDTestCase: ...@@ -256,14 +256,14 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
# self.prepareTestEnv() # self.prepareTestEnv()
# tdLog.printNoPrefix("====================================================================") # tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1 # self.snapshot = 1
# self.tmqCase1() # self.tmqCase1()
# self.tmqCase2() # self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -21,7 +21,7 @@ class TDTestCase: ...@@ -21,7 +21,7 @@ class TDTestCase:
self.ctbNum = 50 self.ctbNum = 50
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
self.autoCtbPrefix = 'aCtb' self.autoCtbPrefix = 'aCtb'
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -51,7 +51,7 @@ class TDTestCase: ...@@ -51,7 +51,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -66,8 +66,8 @@ class TDTestCase: ...@@ -66,8 +66,8 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -96,7 +96,7 @@ class TDTestCase: ...@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -105,15 +105,15 @@ class TDTestCase: ...@@ -105,15 +105,15 @@ class TDTestCase:
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
...@@ -121,7 +121,7 @@ class TDTestCase: ...@@ -121,7 +121,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -143,18 +143,18 @@ class TDTestCase: ...@@ -143,18 +143,18 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -174,15 +174,15 @@ class TDTestCase: ...@@ -174,15 +174,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk") tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -190,23 +190,23 @@ class TDTestCase: ...@@ -190,23 +190,23 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -215,7 +215,7 @@ class TDTestCase: ...@@ -215,7 +215,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -227,7 +227,7 @@ class TDTestCase: ...@@ -227,7 +227,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -237,13 +237,13 @@ class TDTestCase: ...@@ -237,13 +237,13 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -256,14 +256,14 @@ class TDTestCase: ...@@ -256,14 +256,14 @@ class TDTestCase:
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
# self.tmqCase1() # self.tmqCase1()
# self.tmqCase2() # self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -21,7 +21,7 @@ class TDTestCase: ...@@ -21,7 +21,7 @@ class TDTestCase:
self.ctbNum = 50 self.ctbNum = 50
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
self.autoCtbPrefix = 'aCtb' self.autoCtbPrefix = 'aCtb'
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -51,7 +51,7 @@ class TDTestCase: ...@@ -51,7 +51,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -66,8 +66,8 @@ class TDTestCase: ...@@ -66,8 +66,8 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -96,7 +96,7 @@ class TDTestCase: ...@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -105,15 +105,15 @@ class TDTestCase: ...@@ -105,15 +105,15 @@ class TDTestCase:
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
...@@ -121,7 +121,7 @@ class TDTestCase: ...@@ -121,7 +121,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -143,18 +143,18 @@ class TDTestCase: ...@@ -143,18 +143,18 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -174,15 +174,15 @@ class TDTestCase: ...@@ -174,15 +174,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk") tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -190,23 +190,23 @@ class TDTestCase: ...@@ -190,23 +190,23 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -215,7 +215,7 @@ class TDTestCase: ...@@ -215,7 +215,7 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -227,7 +227,7 @@ class TDTestCase: ...@@ -227,7 +227,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -237,13 +237,13 @@ class TDTestCase: ...@@ -237,13 +237,13 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -256,14 +256,14 @@ class TDTestCase: ...@@ -256,14 +256,14 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -20,7 +20,7 @@ class TDTestCase: ...@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 100 self.ctbNum = 100
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -65,8 +65,8 @@ class TDTestCase: ...@@ -65,8 +65,8 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -94,30 +94,30 @@ class TDTestCase: ...@@ -94,30 +94,30 @@ class TDTestCase:
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
if self.snapshot == 0: if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2 + 1)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2 + 1))
elif self.snapshot == 1: elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -143,16 +143,16 @@ class TDTestCase: ...@@ -143,16 +143,16 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if self.snapshot == 0: if self.snapshot == 0:
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
elif self.snapshot == 1: elif self.snapshot == 1:
if not (totalConsumeRows < expectrowcnt and totalConsumeRows >= totalRowsInserted): if not (totalConsumeRows < expectrowcnt and totalConsumeRows >= totalRowsInserted):
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
...@@ -162,7 +162,7 @@ class TDTestCase: ...@@ -162,7 +162,7 @@ class TDTestCase:
self.ctbNum = 1 self.ctbNum = 1
self.snapshot = 0 self.snapshot = 0
self.prepareTestEnv() self.prepareTestEnv()
self.tmqCase1() self.tmqCase1()
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.prepareTestEnv() self.prepareTestEnv()
...@@ -178,7 +178,7 @@ class TDTestCase: ...@@ -178,7 +178,7 @@ class TDTestCase:
self.prepareTestEnv() self.prepareTestEnv()
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册