提交 eba47b9e 编写于 作者: P plum-lihui

[test: add valgrind run tmq]

上级 914164f4
...@@ -49,6 +49,19 @@ class TDTestCase: ...@@ -49,6 +49,19 @@ class TDTestCase:
print(cur) print(cur)
return cur return cur
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind):
shellCmd = 'nohup '
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
...@@ -113,9 +126,8 @@ class TDTestCase: ...@@ -113,9 +126,8 @@ class TDTestCase:
parameterDict["startTs"]) parameterDict["startTs"])
return return
def tmqCase1(self, cfgPath, buildPath): def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db") tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db")
tdLog.info("step 1: create database, stb, ctb and insert data") tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread # create and start thread
parameterDict = {'cfg': '', \ parameterDict = {'cfg': '', \
...@@ -153,7 +165,7 @@ class TDTestCase: ...@@ -153,7 +165,7 @@ class TDTestCase:
auto.offset.reset:earliest' auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
event.wait() event.wait()
...@@ -162,11 +174,8 @@ class TDTestCase: ...@@ -162,11 +174,8 @@ class TDTestCase:
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath valgrind = 1
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
...@@ -190,6 +199,187 @@ class TDTestCase: ...@@ -190,6 +199,187 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 4, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
consumerId = 1
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2:
break
else:
time.sleep(5)
consumerId0 = tdSql.getData(0 , 1)
consumerId1 = tdSql.getData(1 , 1)
actConsumeRows0 = tdSql.getData(0 , 3)
actConsumeRows1 = tdSql.getData(1 , 3)
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
totalConsumeRows = actConsumeRows0 + actConsumeRows1
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db3', \
'vgroups': 4, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
parameterDict2 = {'cfg': '', \
'dbName': 'db3', \
'vgroups': 4, \
'stbName': 'stb2', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
prepareEnvThread2.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
topicList = topicName1
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
# consumerId = 1
# sql = "insert into %s.consumeinfo values "%cdbName
# sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
# tdSql.query(sql)
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready
prepareEnvThread.join()
prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
consumerId0 = tdSql.getData(0 , 1)
#consumerId1 = tdSql.getData(1 , 1)
actConsumeRows0 = tdSql.getData(0 , 3)
#actConsumeRows1 = tdSql.getData(1 , 3)
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
#tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
#totalConsumeRows = actConsumeRows0 + actConsumeRows1
if actConsumeRows0 != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册