提交 2910e7fb 编写于 作者: G Ganlin Zhao

fix test cases

上级 589c6d5f
......@@ -56,7 +56,7 @@ class TDTestCase:
print(cur)
return cur
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
......@@ -65,12 +65,12 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
......@@ -85,11 +85,11 @@ class TDTestCase:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
......@@ -97,14 +97,14 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
......@@ -134,7 +134,7 @@ class TDTestCase:
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
......@@ -149,7 +149,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
......@@ -168,8 +168,8 @@ class TDTestCase:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
def prepareEnv(self, **parameterDict):
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
......@@ -188,8 +188,8 @@ class TDTestCase:
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
# create and start thread
......@@ -205,13 +205,13 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
......@@ -245,7 +245,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
......@@ -255,8 +255,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
self.initConsumerTable()
# create and start thread
......@@ -292,7 +292,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
......@@ -341,7 +341,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
......@@ -362,7 +362,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
def stop(self):
tdSql.close()
......
......@@ -36,7 +36,7 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodesStatusAndCreateMnode(self,dnodeNumbers):
count=0
while count < dnodeNumbers:
......@@ -44,7 +44,7 @@ class TDTestCase:
# tdLog.debug(tdSql.queryResult)
dCnt = 0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready":
if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready":
break
else:
dCnt += 1
......@@ -64,7 +64,7 @@ class TDTestCase:
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodes) :
if tdSql.checkRows(self.mnodes) :
tdLog.debug("mnode is three nodes")
else:
tdLog.exit("mnode number is correct")
......@@ -78,17 +78,17 @@ class TDTestCase:
break
elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
break
elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
else:
break
else:
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
tdSql.query("show mnodes;")
tdSql.checkRows(self.mnodes)
tdSql.query("show mnodes;")
tdSql.checkRows(self.mnodes)
tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort))
tdSql.checkData(0,self.mnodeStatusIndex,'ready')
tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep))
......@@ -101,8 +101,8 @@ class TDTestCase:
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes")
tdLog.debug(tdSql.queryResult)
# if tdSql.checkRows(self.mnodes) :
tdLog.debug(tdSql.queryResult)
# if tdSql.checkRows(self.mnodes) :
# tdLog.debug("mnode is three nodes")
# else:
# tdLog.exit("mnode number is correct")
......@@ -117,21 +117,21 @@ class TDTestCase:
break
elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
break
elif roleOfMnode1=='offline' :
if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
break
elif roleOfMnode2=='offline' :
if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
break
count+=1
else:
......@@ -144,27 +144,27 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......@@ -195,7 +195,7 @@ class TDTestCase:
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
......@@ -234,22 +234,22 @@ class TDTestCase:
tdDnodes[1].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("switch end and wait insert data end ................")
pThread.join()
tdLog.info("check the consume result")
tdSql.query(queryString)
tdLog.info("switch end and wait insert data end ................")
pThread.join()
tdLog.info("check the consume result")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
......
......@@ -36,7 +36,7 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: topic: select * from stb, while consume, add column int-A/bianry-B/float-C, and then modify B, drop C")
tdLog.printNoPrefix("add tag int-A/bianry-B/float-C, and then rename A, modify B, drop C, set t2")
......@@ -61,7 +61,7 @@ class TDTestCase:
topicNameList = ['topic1']
expectRowsList = []
queryStringList = []
queryStringList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
......@@ -71,15 +71,15 @@ class TDTestCase:
# tdLog.info("async insert data")
# pThread = tmqCom.asyncInsertData(paraDict)
tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
tdLog.info("create topics from stb with filter")
queryStringList.append("select * from %s.%s" %(paraDict['dbName'], paraDict['stbName']))
sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
......@@ -91,14 +91,14 @@ class TDTestCase:
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume, then alter schema")
tmqCom.getStartConsumeNotifyFromTmqsim()
# add column double-A/bianry-B/double-C, and then modify B, drop C
# add column double-A/bianry-B/double-C, and then modify B, drop C
sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
......@@ -108,8 +108,8 @@ class TDTestCase:
sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
# add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1
tdSql.execute(sqlString)
# add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1
sqlString = "alter table %s.%s add tag newt1 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add tag newt2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
......@@ -125,27 +125,27 @@ class TDTestCase:
sqlString = "alter table %s.%s0 set tag newt2='new tag'"%(paraDict["dbName"],paraDict['ctbPrefix'])
tdSql.execute(sqlString)
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
tdLog.info("act consume rows: %d"%(resultList[0]))
if expectRowsList[0] != resultList[0]:
tdLog.exit("0 tmq consume rows error!")
tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: topic: select * from ntb, while consume, add column int-A/bianry-B/float-C, and then rename A, modify B, drop C")
paraDict = {'dbName': 'db1',
......@@ -166,12 +166,12 @@ class TDTestCase:
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
ntbName = 'ntb'
ntbName = 'ntb'
topicNameList = ['topic1']
expectRowsList = []
queryStringList = []
queryStringList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
......@@ -182,15 +182,15 @@ class TDTestCase:
# pThread = tmqCom.asyncInsertData(paraDict)
tdCom.insert_rows(tdSql, dbname=paraDict["dbName"], tbname=ntbName, column_ele_list=paraDict['colSchema'], start_ts_value=paraDict["startTs"], count=paraDict["rowsPerTbl"])
tdLog.info("insert data end")
tdLog.info("create topics from ntb with filter")
queryStringList.append("select * from %s.%s" %(paraDict['dbName'], ntbName))
sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
......@@ -202,13 +202,13 @@ class TDTestCase:
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume, then alter schema")
tmqCom.getStartConsumeNotifyFromTmqsim()
# add column double-A/bianry-B/double-C, and then rename A, modify B, drop C
sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
......@@ -223,21 +223,21 @@ class TDTestCase:
sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
tdLog.info("act consume rows: %d"%(resultList[0]))
if expectRowsList[0] != resultList[0]:
tdLog.exit("0 tmq consume rows error!")
tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
......
......@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 500
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
......@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
......@@ -62,10 +62,10 @@ class TDTestCase:
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctbx",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
......@@ -96,7 +96,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
......@@ -105,12 +105,12 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
......@@ -134,9 +134,9 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows != totalRowsFromQuery:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
......@@ -144,7 +144,7 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
......@@ -169,7 +169,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
......@@ -182,13 +182,13 @@ class TDTestCase:
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
# queryString = "select ts, c1, c2 from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' and t5 == 'shanghai' "%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
......@@ -205,7 +205,7 @@ class TDTestCase:
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctby",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
......@@ -215,9 +215,9 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows != totalRowsFromQuery:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
......@@ -231,14 +231,14 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
......
......@@ -27,26 +27,26 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -78,13 +78,13 @@ class TDTestCase:
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
......@@ -100,15 +100,15 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
self.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -117,7 +117,7 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
......@@ -127,7 +127,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
......@@ -143,8 +143,8 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 2
topicList = topicNameList[2]
......@@ -153,7 +153,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
# if expectRowsList[2] != resultList[0]:
......@@ -162,7 +162,7 @@ class TDTestCase:
# self.checkFileContent(consumerId, queryString)
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
......
......@@ -27,26 +27,26 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -78,13 +78,13 @@ class TDTestCase:
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts,c1,c2 from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as stable %s.%s" %(topicNameList[0], paraDict["dbName"],paraDict["stbName"])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
......@@ -100,15 +100,15 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
self.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -116,7 +116,7 @@ class TDTestCase:
sqlString = "create topic %s as database %s" %(topicNameList[1], paraDict['dbName'])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
......@@ -126,7 +126,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
......@@ -141,7 +141,7 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 2
......@@ -151,7 +151,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[2] != resultList[0]:
......@@ -160,7 +160,7 @@ class TDTestCase:
self.checkFileContent(consumerId, queryString)
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
......
......@@ -41,23 +41,23 @@ class TMQCom:
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
......@@ -72,13 +72,13 @@ class TMQCom:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def selectConsumeMsgResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
......@@ -88,11 +88,11 @@ class TMQCom:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 2))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0,snapshot=0):
......@@ -102,7 +102,7 @@ class TMQCom:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
processorName = buildPath + '\\build\\bin\\tmq_sim.exe'
if alias != 0:
......@@ -111,8 +111,8 @@ class TMQCom:
os.system(shellCmd)
processorName = processorNameNew
shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += "> nul 2>&1 &"
shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += "> nul 2>&1 &"
else:
processorName = buildPath + '/build/bin/tmq_sim'
if alias != 0:
......@@ -121,10 +121,10 @@ class TMQCom:
os.system(shellCmd)
processorName = processorNameNew
shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
os.system(shellCmd)
def stopTmqSimProcess(self, processorName):
psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
......@@ -149,7 +149,7 @@ class TMQCom:
for i in range(actRows):
if tdSql.getData(i, 1) == 0:
loopFlag = 0
break
break
time.sleep(0.1)
return
......@@ -163,7 +163,7 @@ class TMQCom:
for i in range(actRows):
if tdSql.getData(i, 1) == 1:
loopFlag = 0
break
break
time.sleep(0.1)
return
......@@ -196,7 +196,7 @@ class TMQCom:
tagBinaryValue = 'shanghai'
elif (i % 3 == 0):
tagBinaryValue = 'changsha'
sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
tblBatched += 1
if (i == ctbNum-1 ) or (tblBatched == batchNum):
......@@ -206,9 +206,9 @@ class TMQCom:
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
return
def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0):
for _ in range(count):
......@@ -246,7 +246,7 @@ class TMQCom:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
return
# schema: (ts timestamp, c1 int, c2 int, c3 binary(16))
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
......@@ -373,16 +373,16 @@ class TMQCom:
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsBatched = 0
rowsBatched = 0
for i in range(ctbNum):
tagBinaryValue = 'beijing'
if (i % 2 == 0):
tagBinaryValue = 'shanghai'
elif (i % 3 == 0):
tagBinaryValue = 'changsha'
sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
for j in range(rowsPerTbl):
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
......@@ -413,7 +413,7 @@ class TMQCom:
for i in range(ctbNum):
tbName = '%s%s'%(ctbPrefix,i)
tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
return
return
def threadFunction(self, **paraDict):
# create new connector for new tdSql instance in my thread
......@@ -447,20 +447,20 @@ class TMQCom:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
# skip offset for consumer
for i in range(0,skipRowsOfCons):
consumeFile.readline()
consumeFile.readline()
lines = 0
while True:
dst = queryFile.readline()
......@@ -473,7 +473,7 @@ class TMQCom:
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
break
return
return
def getResultFileByTaosShell(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
......@@ -483,15 +483,15 @@ class TMQCom:
tdLog.info(cmdStr)
os.system(cmdStr)
return dstFile
def checkTmqConsumeFileContent(self, consumerId, dstFile):
cfgPath = tdCom.getClientCfgPath()
def checkTmqConsumeFileContent(self, consumerId, dstFile):
cfgPath = tdCom.getClientCfgPath()
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
lines = 0
......@@ -506,7 +506,7 @@ class TMQCom:
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
break
return
return
def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs):
tb_params = ""
......@@ -538,7 +538,7 @@ class TMQCom:
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
tsql.execute(insert_sql)
def waitSubscriptionExit(self, tsql, topicName):
wait_cnt = 0
while True:
......@@ -548,7 +548,7 @@ class TMQCom:
for idx in range (rows):
if tsql.getData(idx, 0) != topicName:
continue
if tsql.getData(idx, 3) == None:
continue
else:
......@@ -556,10 +556,10 @@ class TMQCom:
wait_cnt += 1
exit_flag = 0
break
if exit_flag == 1:
break
tsql.query("show subscriptions")
tdLog.info("show subscriptions:")
tdLog.info(tsql.queryResult)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册