提交 5c1eef7a 编写于 作者: G Ganlin Zhao

fix test cases

上级 14946d8b
...@@ -20,7 +20,7 @@ class TDTestCase: ...@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 1 self.ctbNum = 1
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -58,12 +58,12 @@ class TDTestCase: ...@@ -58,12 +58,12 @@ class TDTestCase:
tdLog.info("create ctb") tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("flush db to let data falls into the disk") tdLog.info("flush db to let data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
return return
...@@ -93,18 +93,18 @@ class TDTestCase: ...@@ -93,18 +93,18 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1'] topicNameList = ['topic1']
expectRowsList = [] expectRowsList = []
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter") tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString) sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# tdSql.query(queryString) # tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows()) # expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result # init consume info, and start tmq_sim, then check consume result
...@@ -121,29 +121,29 @@ class TDTestCase: ...@@ -121,29 +121,29 @@ class TDTestCase:
paraDict['batchNum'] = 100 paraDict['batchNum'] = 100
paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
pInsertThread.join() pInsertThread.join()
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
if expectRowsList[0] != resultList[0]: if expectRowsList[0] != resultList[0]:
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
tmqCom.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
...@@ -173,18 +173,18 @@ class TDTestCase: ...@@ -173,18 +173,18 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1'] topicNameList = ['topic1']
expectRowsList = [] expectRowsList = []
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter") tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString) sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0] totalRowsInserted = expectRowsList[0]
...@@ -200,36 +200,36 @@ class TDTestCase: ...@@ -200,36 +200,36 @@ class TDTestCase:
tdLog.info("start consume processor 0") tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeRows = resultList[0] actConsumeRows = resultList[0]
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows): if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows):
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
consumerId = 2 consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1") tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeRows = resultList[0] actConsumeRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)): if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)):
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ") tdLog.printNoPrefix("======== test case 2 end ...... ")
......
...@@ -56,12 +56,12 @@ class TDTestCase: ...@@ -56,12 +56,12 @@ class TDTestCase:
print(cur) print(cur)
return cur return cur
def initConsumerTable(self,cdbName='cdb'): def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table") tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("drop database if exists %s "%(cdbName)) tdSql.query("drop database if exists %s "%(cdbName))
tdSql.query("create database %s vgroups 1"%(cdbName)) tdSql.query("create database %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
...@@ -75,7 +75,7 @@ class TDTestCase: ...@@ -75,7 +75,7 @@ class TDTestCase:
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql) tdLog.info("consume info sql: %s"%sql)
...@@ -90,11 +90,11 @@ class TDTestCase: ...@@ -90,11 +90,11 @@ class TDTestCase:
break break
else: else:
time.sleep(5) time.sleep(5)
for i in range(expectRows): for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3)) resultList.append(tdSql.getData(i , 3))
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
...@@ -102,14 +102,14 @@ class TDTestCase: ...@@ -102,14 +102,14 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -139,7 +139,7 @@ class TDTestCase: ...@@ -139,7 +139,7 @@ class TDTestCase:
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return return
...@@ -158,7 +158,7 @@ class TDTestCase: ...@@ -158,7 +158,7 @@ class TDTestCase:
ctbDict[i] = 0 ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0 rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl: while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum): for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
...@@ -185,7 +185,7 @@ class TDTestCase: ...@@ -185,7 +185,7 @@ class TDTestCase:
startTs = int(round(t * 1000)) startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0 rowsOfSql = 0
for i in range(ctbNum): for i in range(ctbNum):
sql += " %s_%d values "%(ctbPrefix,i) sql += " %s_%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
...@@ -216,7 +216,7 @@ class TDTestCase: ...@@ -216,7 +216,7 @@ class TDTestCase:
startTs = int(round(t * 1000)) startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0 rowsOfSql = 0
for i in range(ctbNum): for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
...@@ -235,8 +235,8 @@ class TDTestCase: ...@@ -235,8 +235,8 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
# create new connector for my thread # create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
...@@ -255,7 +255,7 @@ class TDTestCase: ...@@ -255,7 +255,7 @@ class TDTestCase:
return return
def tmqCase1(self, cfgPath, buildPath): def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
''' '''
subscribe one db, multi normal table which have not same schema, and include rows of all tables in one insert sql subscribe one db, multi normal table which have not same schema, and include rows of all tables in one insert sql
''' '''
...@@ -274,11 +274,11 @@ class TDTestCase: ...@@ -274,11 +274,11 @@ class TDTestCase:
'batchNum': 100, \ 'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"]) self.create_database(tdSql, parameterDict["dbName"])
tdSql.execute("create table %s.ntb0 (ts timestamp, c1 int)"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb0 (ts timestamp, c1 int)"%(parameterDict["dbName"]))
tdSql.execute("create table %s.ntb1 (ts timestamp, c1 int, c2 float)"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb1 (ts timestamp, c1 int, c2 float)"%(parameterDict["dbName"]))
tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(parameterDict["dbName"]))
tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(parameterDict["dbName"]))
tdSql.execute("insert into %s.ntb0 values(now, 1) %s.ntb1 values(now, 1, 1) %s.ntb2 values(now, 1, 1, '1') %s.ntb3 values(now, 1, 1, '1', now)"%(parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"])) tdSql.execute("insert into %s.ntb0 values(now, 1) %s.ntb1 values(now, 1, 1) %s.ntb2 values(now, 1, 1, '1') %s.ntb3 values(now, 1, 1, '1', now)"%(parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"]))
...@@ -301,7 +301,7 @@ class TDTestCase: ...@@ -301,7 +301,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicFromDb = 'topic_db_mulit_tbl' topicFromDb = 'topic_db_mulit_tbl'
tdSql.execute("create topic %s as database %s" %(topicFromDb, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicFromDb, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = numOfNtb * rowsOfPerNtb expectrowcnt = numOfNtb * rowsOfPerNtb
...@@ -324,7 +324,7 @@ class TDTestCase: ...@@ -324,7 +324,7 @@ class TDTestCase:
totalConsumeRows = 0 totalConsumeRows = 0
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
...@@ -334,7 +334,7 @@ class TDTestCase: ...@@ -334,7 +334,7 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath): def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
''' '''
subscribe one stb, multi child talbe and normal table which have not same schema, and include rows of all tables in one insert sql subscribe one stb, multi child talbe and normal table which have not same schema, and include rows of all tables in one insert sql
''' '''
...@@ -355,7 +355,7 @@ class TDTestCase: ...@@ -355,7 +355,7 @@ class TDTestCase:
parameterDict['cfg'] = cfgPath parameterDict['cfg'] = cfgPath
dbName = parameterDict["dbName"] dbName = parameterDict["dbName"]
self.create_database(tdSql, dbName) self.create_database(tdSql, dbName)
tdSql.execute("create stable %s.stb (ts timestamp, s1 bigint, s2 binary(32), s3 double) tags (t1 int, t2 binary(32))"%(dbName)) tdSql.execute("create stable %s.stb (ts timestamp, s1 bigint, s2 binary(32), s3 double) tags (t1 int, t2 binary(32))"%(dbName))
...@@ -364,7 +364,7 @@ class TDTestCase: ...@@ -364,7 +364,7 @@ class TDTestCase:
tdSql.execute("create table %s.ntb0 (ts timestamp, c1 binary(32))"%(dbName)) tdSql.execute("create table %s.ntb0 (ts timestamp, c1 binary(32))"%(dbName))
tdSql.execute("create table %s.ntb1 (ts timestamp, c1 binary(32), c2 float)"%(dbName)) tdSql.execute("create table %s.ntb1 (ts timestamp, c1 binary(32), c2 float)"%(dbName))
tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(dbName)) tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(dbName))
tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(dbName)) tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(dbName))
tdSql.execute("insert into %s.ntb0 values(now, 'ntb0-11') \ tdSql.execute("insert into %s.ntb0 values(now, 'ntb0-11') \
...@@ -401,7 +401,7 @@ class TDTestCase: ...@@ -401,7 +401,7 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicFromStb = 'topic_stb_mulit_tbl' topicFromStb = 'topic_stb_mulit_tbl'
tdSql.execute("create topic %s as stable %s.stb" %(topicFromStb, dbName)) tdSql.execute("create topic %s as stable %s.stb" %(topicFromStb, dbName))
consumerId = 0 consumerId = 0
expectrowcnt = numOfCtb * rowsOfPerNtb expectrowcnt = numOfCtb * rowsOfPerNtb
...@@ -424,7 +424,7 @@ class TDTestCase: ...@@ -424,7 +424,7 @@ class TDTestCase:
totalConsumeRows = 0 totalConsumeRows = 0
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
...@@ -445,7 +445,7 @@ class TDTestCase: ...@@ -445,7 +445,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath) self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -38,20 +38,20 @@ class TDTestCase: ...@@ -38,20 +38,20 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr) tdLog.info(cmdStr)
os.system(cmdStr) os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r') consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r') queryFile = open(dstFile, mode='r')
# skip first line for it is schema # skip first line for it is schema
queryFile.readline() queryFile.readline()
while True: while True:
dst = queryFile.readline() dst = queryFile.readline()
src = consumeFile.readline() src = consumeFile.readline()
if dst: if dst:
if dst != src: if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
...@@ -84,7 +84,7 @@ class TDTestCase: ...@@ -84,7 +84,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -101,13 +101,13 @@ class TDTestCase: ...@@ -101,13 +101,13 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.asyncInsertDataByInterlace(paraDict) # tmqCom.asyncInsertDataByInterlace(paraDict)
tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1) tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1)
tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("drop database %s"%paraDict["dbName"]) tdSql.query("drop database %s"%paraDict["dbName"])
return return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
# create and start thread # create and start thread
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
...@@ -132,14 +132,14 @@ class TDTestCase: ...@@ -132,14 +132,14 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
consumerId = 0 consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1 topicList = topicFromStb1
...@@ -166,13 +166,13 @@ class TDTestCase: ...@@ -166,13 +166,13 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
......
此差异已折叠。
...@@ -79,27 +79,27 @@ class TDTestCase: ...@@ -79,27 +79,27 @@ class TDTestCase:
topicNameList = ['topic1', 'topic2', 'topic3'] topicNameList = ['topic1', 'topic2', 'topic3']
expectRowsList = [] expectRowsList = []
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter") tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString) sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
queryString = "select ts, log(c1), cos(c1) from %s.%s where c1 > 5000" %(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, log(c1), cos(c1) from %s.%s where c1 > 5000" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString) sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
queryString = "select ts, log(c1), atan(c1) from %s.%s where ts >= %d" %(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9000) queryString = "select ts, log(c1), atan(c1) from %s.%s where ts >= %d" %(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9000)
sqlString = "create topic %s as %s" %(topicNameList[2], queryString) sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result # init consume info, and start tmq_sim, then check consume result
...@@ -115,10 +115,10 @@ class TDTestCase: ...@@ -115,10 +115,10 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]: if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
...@@ -132,7 +132,7 @@ class TDTestCase: ...@@ -132,7 +132,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]: if expectRowsList[1] != resultList[0]:
...@@ -148,14 +148,14 @@ class TDTestCase: ...@@ -148,14 +148,14 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[2] != resultList[0]: if expectRowsList[2] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
time.sleep(10) time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i])
...@@ -193,7 +193,7 @@ class TDTestCase: ...@@ -193,7 +193,7 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[0], queryString) sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where sin(c2) >= 0" %(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where sin(c2) >= 0" %(paraDict['dbName'], paraDict['stbName'])
...@@ -209,7 +209,7 @@ class TDTestCase: ...@@ -209,7 +209,7 @@ class TDTestCase:
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
# start tmq consume processor # start tmq consume processor
tdLog.info("insert consume info to consume processor") tdLog.info("insert consume info to consume processor")
consumerId = 0 consumerId = 0
...@@ -223,10 +223,10 @@ class TDTestCase: ...@@ -223,10 +223,10 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]: if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
...@@ -240,7 +240,7 @@ class TDTestCase: ...@@ -240,7 +240,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]: if expectRowsList[1] != resultList[0]:
...@@ -256,14 +256,14 @@ class TDTestCase: ...@@ -256,14 +256,14 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result") tdLog.info("wait the consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[2] != resultList[0]: if expectRowsList[2] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
# time.sleep(10) # time.sleep(10)
# for i in range(len(topicNameList)): # for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i]) # tdSql.query("drop topic %s"%topicNameList[i])
......
...@@ -20,7 +20,7 @@ class TDTestCase: ...@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 1 self.ctbNum = 1
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -65,11 +65,11 @@ class TDTestCase: ...@@ -65,11 +65,11 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -95,7 +95,7 @@ class TDTestCase: ...@@ -95,7 +95,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
...@@ -103,16 +103,16 @@ class TDTestCase: ...@@ -103,16 +103,16 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_UpperCase_stb1' topicFromStb1 = 'topic_UpperCase_stb1'
# queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
...@@ -139,18 +139,18 @@ class TDTestCase: ...@@ -139,18 +139,18 @@ class TDTestCase:
tdLog.info("run select sql from db") tdLog.info("run select sql from db")
tdSql.query(queryString) tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery)) tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows != totalRowsFromQuery: if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -170,15 +170,15 @@ class TDTestCase: ...@@ -170,15 +170,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
# paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) # paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -187,17 +187,17 @@ class TDTestCase: ...@@ -187,17 +187,17 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_UpperCase_stb1' topicFromStb1 = 'topic_UpperCase_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
# queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1 consumerId = 1
...@@ -213,13 +213,13 @@ class TDTestCase: ...@@ -213,13 +213,13 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -229,7 +229,7 @@ class TDTestCase: ...@@ -229,7 +229,7 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0: if self.snapshot == 0:
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
...@@ -237,8 +237,8 @@ class TDTestCase: ...@@ -237,8 +237,8 @@ class TDTestCase:
elif self.snapshot == 1: elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery: if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -251,14 +251,14 @@ class TDTestCase: ...@@ -251,14 +251,14 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -20,7 +20,7 @@ class TDTestCase: ...@@ -20,7 +20,7 @@ class TDTestCase:
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 100 self.ctbNum = 100
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") tdLog.info("create stb")
...@@ -65,11 +65,11 @@ class TDTestCase: ...@@ -65,11 +65,11 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
return return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -95,7 +95,7 @@ class TDTestCase: ...@@ -95,7 +95,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
...@@ -103,16 +103,16 @@ class TDTestCase: ...@@ -103,16 +103,16 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_UpperCase_stb1' topicFromStb1 = 'topic_UpperCase_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
# queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
...@@ -139,18 +139,18 @@ class TDTestCase: ...@@ -139,18 +139,18 @@ class TDTestCase:
tdLog.info("run select sql from db") tdLog.info("run select sql from db")
tdSql.query(queryString) tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery)) tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows != totalRowsFromQuery: if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ") tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
...@@ -170,15 +170,15 @@ class TDTestCase: ...@@ -170,15 +170,15 @@ class TDTestCase:
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
# paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) # paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
# paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
...@@ -187,17 +187,17 @@ class TDTestCase: ...@@ -187,17 +187,17 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_UpperCase_stb1' topicFromStb1 = 'topic_UpperCase_stb1'
# queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1 consumerId = 1
...@@ -213,7 +213,7 @@ class TDTestCase: ...@@ -213,7 +213,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 7/10) paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 7/10)
paraDict['ctbStartIdx'] = int(paraDict['ctbNum'] * 7/10) paraDict['ctbStartIdx'] = int(paraDict['ctbNum'] * 7/10)
# paraDict["rowsPerTbl"] = 100 # paraDict["rowsPerTbl"] = 100
...@@ -221,7 +221,7 @@ class TDTestCase: ...@@ -221,7 +221,7 @@ class TDTestCase:
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -231,7 +231,7 @@ class TDTestCase: ...@@ -231,7 +231,7 @@ class TDTestCase:
tdSql.query(queryString) tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0: if self.snapshot == 0:
if totalConsumeRows != expectrowcnt / 2: if totalConsumeRows != expectrowcnt / 2:
...@@ -239,8 +239,8 @@ class TDTestCase: ...@@ -239,8 +239,8 @@ class TDTestCase:
elif self.snapshot == 1: elif self.snapshot == 1:
if totalConsumeRows != totalRowsFromQuery: if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error when snapshot is 1!") tdLog.exit("tmq consume rows error when snapshot is 1!")
# tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -253,13 +253,13 @@ class TDTestCase: ...@@ -253,13 +253,13 @@ class TDTestCase:
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册