提交 d00dd167 编写于 作者: G Ganlin Zhao

fix test cases

上级 7b765cc5
......@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
......@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
......@@ -62,7 +62,7 @@ class TDTestCase:
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
......@@ -94,18 +94,18 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
......@@ -120,18 +120,18 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
tmqCom.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
......@@ -162,18 +162,18 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
......@@ -189,15 +189,15 @@ class TDTestCase:
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result
......@@ -205,22 +205,22 @@ class TDTestCase:
consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
......
......@@ -27,26 +27,26 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -78,7 +78,7 @@ class TDTestCase:
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
......@@ -114,7 +114,7 @@ class TDTestCase:
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
......@@ -132,7 +132,7 @@ class TDTestCase:
pThread.join()
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actTotalRows = 0
......@@ -140,7 +140,7 @@ class TDTestCase:
actTotalRows += resultList[i]
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectRowsList.append(tdSql.getRows())
expectTotalRows = 0
for i in range(len(expectRowsList)):
expectTotalRows += expectRowsList[i]
......@@ -150,7 +150,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows))
tdLog.exit("0 tmq consume rows error!")
# time.sleep(10)
# time.sleep(10)
# for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i])
......
......@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
......@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
......@@ -65,11 +65,11 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def delData(self,tsql,dbName,ctbPrefix,ctbNum,startTs=0,endTs=0,ctbStartIdx=0):
tdLog.debug("start to del data ............")
for i in range(ctbNum):
......@@ -84,12 +84,12 @@ class TDTestCase:
newTdSql = tdCom.newTdSql()
self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"])
return
def asyncDeleteData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict)
pThread.start()
return pThread
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
......@@ -116,29 +116,29 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# del some data
# del some data
rowsOfDelete = int(paraDict["rowsPerTbl"] / 4)
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.execute(sqlString)
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
consumerId = 1
rowsOfDelete = 0
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicFromStb1
ifcheckdata = 1
......@@ -161,23 +161,23 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act query rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsFromQuery))
if self.snapshot == 0:
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error with snapshot = 1!")
tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete)
tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -197,50 +197,50 @@ class TDTestCase:
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to 1/4 rows and insert 3/4 new rows
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# del some data
rowsOfDelete = int(self.rowsPerTbl / 4 )
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
consumerId = 1
if self.snapshot == 0:
consumerId = 2
consumerId = 2
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 3
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
......@@ -252,7 +252,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -262,24 +262,24 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error with snapshot = 1!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -299,34 +299,34 @@ class TDTestCase:
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
consumerId = 1
if self.snapshot == 0:
consumerId = 4
consumerId = 4
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 5
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
......@@ -340,20 +340,20 @@ class TDTestCase:
rowsOfDelete = int(self.rowsPerTbl / 4 )
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
pDeleteThread = self.asyncDeleteData(paraDict)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# update to 1/4 rows and insert 3/4 new rows
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
# paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pInsertThread.join()
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -363,17 +363,17 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 1!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
......@@ -385,17 +385,17 @@ class TDTestCase:
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
self.prepareTestEnv()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
self.tmqCase2()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
self.tmqCase2()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
......@@ -404,7 +404,7 @@ class TDTestCase:
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.prepareTestEnv()
self.prepareTestEnv()
self.tmqCase3()
def stop(self):
......
......@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
......@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
......@@ -65,11 +65,11 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def delData(self,tsql,dbName,ctbPrefix,ctbNum,startTs=0,endTs=0,ctbStartIdx=0):
tdLog.debug("start to del data ............")
for i in range(ctbNum):
......@@ -84,12 +84,12 @@ class TDTestCase:
newTdSql = tdCom.newTdSql()
self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"])
return
def asyncDeleteData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict)
pThread.start()
return pThread
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
......@@ -116,29 +116,29 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# del some data
# del some data
rowsOfDelete = int(paraDict["rowsPerTbl"] / 4)
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.execute(sqlString)
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
consumerId = 1
rowsOfDelete = 0
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicFromStb1
ifcheckdata = 1
......@@ -161,23 +161,23 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act query rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsFromQuery))
if self.snapshot == 0:
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error with snapshot = 1!")
# tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete)
# tmqCom.checkFileContent(consumerId=consumerId, queryString=queryString, skipRowsOfCons=rowsOfDelete)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -197,50 +197,50 @@ class TDTestCase:
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to 1/4 rows and insert 3/4 new rows
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# del some data
rowsOfDelete = int(self.rowsPerTbl / 4 )
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],
startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
consumerId = 1
if self.snapshot == 0:
consumerId = 2
consumerId = 2
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 3
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
......@@ -252,7 +252,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -262,24 +262,24 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error with snapshot = 1!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -299,34 +299,34 @@ class TDTestCase:
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
consumerId = 1
if self.snapshot == 0:
consumerId = 4
consumerId = 4
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 5
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
......@@ -340,20 +340,20 @@ class TDTestCase:
rowsOfDelete = int(self.rowsPerTbl / 4 )
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
pDeleteThread = self.asyncDeleteData(paraDict)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# update to 1/4 rows and insert 3/4 new rows
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
# paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pInsertThread.join()
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -363,47 +363,47 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
if totalConsumeRows < expectrowcnt:
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 1!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
def run(self):
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
self.tmqCase2()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
self.prepareTestEnv()
self.tmqCase3()
self.prepareTestEnv()
self.tmqCase3()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.prepareTestEnv()
self.prepareTestEnv()
self.tmqCase3()
def stop(self):
......
......@@ -56,7 +56,7 @@ class TDTestCase:
print(cur)
return cur
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
......@@ -65,12 +65,12 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
......@@ -85,11 +85,11 @@ class TDTestCase:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
......@@ -98,9 +98,9 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
......@@ -130,7 +130,7 @@ class TDTestCase:
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
......@@ -149,7 +149,7 @@ class TDTestCase:
ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0
rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
......@@ -176,7 +176,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
......@@ -207,7 +207,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
for j in range(rowsPerTbl):
......@@ -226,8 +226,8 @@ class TDTestCase:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
def prepareEnv(self, **parameterDict):
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
......@@ -246,8 +246,8 @@ class TDTestCase:
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
# create and start thread
......@@ -264,7 +264,7 @@ class TDTestCase:
'batchNum': 23, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbPrefix"], parameterDict["ctbNum"])
......@@ -272,7 +272,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
......@@ -303,7 +303,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
......@@ -313,8 +313,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
self.initConsumerTable()
# create and start thread
......@@ -339,7 +339,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2
......@@ -373,7 +373,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
......@@ -387,8 +387,8 @@ class TDTestCase:
# 自动建表完成数据插入,启动消费
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: ")
tdLog.printNoPrefix("======== test case 3: ")
self.initConsumerTable()
# create and start thread
......@@ -414,7 +414,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
......@@ -444,7 +444,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
......@@ -466,7 +466,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
# self.tmqCase1(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath)
# self.tmqCase4(cfgPath, buildPath)
# self.tmqCase5(cfgPath, buildPath)
......
......@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 2
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
......@@ -51,7 +51,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
tdLog.info("create stb")
......@@ -63,7 +63,7 @@ class TDTestCase:
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
......@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
......@@ -110,12 +110,12 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
......@@ -129,7 +129,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
......@@ -146,7 +146,7 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQury = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
if not (totalConsumeRows == totalRowsFromQury):
tdLog.exit("tmq consume rows error!")
......@@ -157,7 +157,7 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -182,7 +182,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
......@@ -195,12 +195,12 @@ class TDTestCase:
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
......@@ -225,7 +225,7 @@ class TDTestCase:
paraDict["batchNum"] = 100
paraDict["ctbPrefix"] = 'newCtb'
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -235,7 +235,7 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
......@@ -249,7 +249,7 @@ class TDTestCase:
# tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
self.tmqCase2()
def stop(self):
tdSql.close()
......
......@@ -20,11 +20,11 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 1000
self.rowsPerTbl = 10
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -51,8 +51,8 @@ class TDTestCase:
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
......@@ -60,11 +60,11 @@ class TDTestCase:
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
......@@ -83,13 +83,13 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -98,19 +98,19 @@ class TDTestCase:
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
tdLog.info("wait subscriptions exit ....")
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
# drop some ntbs and create some new ntbs
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
......@@ -137,8 +137,8 @@ class TDTestCase:
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
......@@ -146,11 +146,11 @@ class TDTestCase:
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 2
elif self.snapshot == 1:
......@@ -169,20 +169,20 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start create some new normal tables....")
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = self.ctbNum
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into these new normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -191,24 +191,24 @@ class TDTestCase:
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
tdLog.info("wait subscriptions exit ....")
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
def run(self):
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
self.tmqCase1()
self.tmqCase2()
self.tmqCase2()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册