提交 54cf2f7c 编写于 作者: G Ganlin Zhao

fix test cases

上级 c017e438
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
return cur return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
...@@ -63,7 +63,7 @@ class TDTestCase: ...@@ -63,7 +63,7 @@ class TDTestCase:
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
...@@ -90,7 +90,7 @@ class TDTestCase: ...@@ -90,7 +90,7 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
...@@ -109,9 +109,9 @@ class TDTestCase: ...@@ -109,9 +109,9 @@ class TDTestCase:
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\ parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\ parameterDict["batchNum"],\
parameterDict["startTs"]) parameterDict["startTs"])
return return
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -138,22 +138,22 @@ class TDTestCase: ...@@ -138,22 +138,22 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
time.sleep(2) time.sleep(2)
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column' topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column' topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1) time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
#tdSql.checkRows(2) #tdSql.checkRows(2)
...@@ -161,15 +161,15 @@ class TDTestCase: ...@@ -161,15 +161,15 @@ class TDTestCase:
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
print (topic1) print (topic1)
print (topic2) print (topic2)
print (topicFromStb) print (topicFromStb)
print (topicFromCtb) print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2) #tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb: #if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1") # tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb: #if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2") # tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
...@@ -187,7 +187,7 @@ class TDTestCase: ...@@ -187,7 +187,7 @@ class TDTestCase:
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data") tdLog.info("check stb if there are data")
while 1: while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"]) tdSql.query("select count(*) from %s"%parameterDict["stbName"])
...@@ -198,21 +198,21 @@ class TDTestCase: ...@@ -198,21 +198,21 @@ class TDTestCase:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
...@@ -223,17 +223,17 @@ class TDTestCase: ...@@ -223,17 +223,17 @@ class TDTestCase:
time.sleep(5) time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb) tdSql.query("drop topic %s"%topicFromCtb)
# ============================================================================== # ==============================================================================
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ") tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
tdLog.info(" clean database") tdLog.info(" clean database")
# create and start thread # create and start thread
parameterDict = {'cfg': '', \ parameterDict = {'cfg': '', \
'dbName': 'db2', \ 'dbName': 'db2', \
...@@ -247,21 +247,21 @@ class TDTestCase: ...@@ -247,21 +247,21 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("show databases") tdSql.query("show databases")
if tdSql.getRows() == 4: if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break break
else: else:
time.sleep(1) time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName']) tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
time.sleep(1) time.sleep(1)
...@@ -269,25 +269,25 @@ class TDTestCase: ...@@ -269,25 +269,25 @@ class TDTestCase:
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2' topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2' topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1) time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
print (topic1) print (topic1)
print (topic2) print (topic2)
print (topicFromStb) print (topicFromStb)
print (topicFromCtb) print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2) #tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb: #if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1") # tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb: #if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2") # tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -305,7 +305,7 @@ class TDTestCase: ...@@ -305,7 +305,7 @@ class TDTestCase:
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data") tdLog.info("check stb if there are data")
while 1: while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"]) tdSql.query("select count(*) from %s"%parameterDict["stbName"])
...@@ -316,17 +316,17 @@ class TDTestCase: ...@@ -316,17 +316,17 @@ class TDTestCase:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# create new child table and insert data # create new child table and insert data
newCtbName = 'newctb' newCtbName = 'newctb'
...@@ -340,7 +340,7 @@ class TDTestCase: ...@@ -340,7 +340,7 @@ class TDTestCase:
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
...@@ -352,12 +352,12 @@ class TDTestCase: ...@@ -352,12 +352,12 @@ class TDTestCase:
expectmsgcnt += rowsOfNewCtb expectmsgcnt += rowsOfNewCtb
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
# ============================================================================== # ==============================================================================
tdLog.printNoPrefix("======== test scenario 3: ") tdLog.printNoPrefix("======== test scenario 3: ")
......
...@@ -49,7 +49,7 @@ class TDTestCase: ...@@ -49,7 +49,7 @@ class TDTestCase:
return cur return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
...@@ -62,7 +62,7 @@ class TDTestCase: ...@@ -62,7 +62,7 @@ class TDTestCase:
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
...@@ -89,7 +89,7 @@ class TDTestCase: ...@@ -89,7 +89,7 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
...@@ -108,7 +108,7 @@ class TDTestCase: ...@@ -108,7 +108,7 @@ class TDTestCase:
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\ parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\ parameterDict["batchNum"],\
parameterDict["startTs"]) parameterDict["startTs"])
return return
...@@ -128,34 +128,34 @@ class TDTestCase: ...@@ -128,34 +128,34 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
time.sleep(2) time.sleep(2)
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column' topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column' topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1) time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
#tdSql.checkRows(2) #tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2)) tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb: if topic1 != topicFromStb and topic1 != topicFromCtb:
tdLog.exit("topic error1") tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb: if topic2 != topicFromStb and topic2 != topicFromCtb:
tdLog.exit("topic error2") tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
...@@ -172,7 +172,7 @@ class TDTestCase: ...@@ -172,7 +172,7 @@ class TDTestCase:
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data") tdLog.info("check stb if there are data")
while 1: while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"]) tdSql.query("select count(*) from %s"%parameterDict["stbName"])
...@@ -183,21 +183,21 @@ class TDTestCase: ...@@ -183,21 +183,21 @@ class TDTestCase:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
...@@ -217,7 +217,7 @@ class TDTestCase: ...@@ -217,7 +217,7 @@ class TDTestCase:
tdSql.query("drop topic %s"%topicFromCtb) tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath): def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: add child table with consuming ") tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
# create and start thread # create and start thread
...@@ -233,21 +233,21 @@ class TDTestCase: ...@@ -233,21 +233,21 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("show databases") tdSql.query("show databases")
if tdSql.getRows() == 4: if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break break
else: else:
time.sleep(1) time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName']) tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
time.sleep(1) time.sleep(1)
...@@ -255,20 +255,20 @@ class TDTestCase: ...@@ -255,20 +255,20 @@ class TDTestCase:
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2' topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2' topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1) time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2)) tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb: if topic1 != topicFromStb and topic1 != topicFromCtb:
tdLog.exit("topic error1") tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb: if topic2 != topicFromStb and topic2 != topicFromCtb:
tdLog.exit("topic error2") tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -286,7 +286,7 @@ class TDTestCase: ...@@ -286,7 +286,7 @@ class TDTestCase:
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data") tdLog.info("check stb if there are data")
while 1: while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"]) tdSql.query("select count(*) from %s"%parameterDict["stbName"])
...@@ -297,17 +297,17 @@ class TDTestCase: ...@@ -297,17 +297,17 @@ class TDTestCase:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# create new child table and insert data # create new child table and insert data
newCtbName = 'newctb' newCtbName = 'newctb'
...@@ -320,7 +320,7 @@ class TDTestCase: ...@@ -320,7 +320,7 @@ class TDTestCase:
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
...@@ -332,7 +332,7 @@ class TDTestCase: ...@@ -332,7 +332,7 @@ class TDTestCase:
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb) tdSql.query("drop topic %s"%topicFromCtb)
...@@ -355,25 +355,25 @@ class TDTestCase: ...@@ -355,25 +355,25 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("show databases") tdSql.query("show databases")
if tdSql.getRows() == 4: if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break break
else: else:
time.sleep(1) time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName']) tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("create stable2 for the seconde topic") tdLog.info("create stable2 for the seconde topic")
parameterDict2 = {'cfg': '', \ parameterDict2 = {'cfg': '', \
'dbName': 'db3', \ 'dbName': 'db3', \
...@@ -385,23 +385,23 @@ class TDTestCase: ...@@ -385,23 +385,23 @@ class TDTestCase:
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict2['cfg'] = cfgPath parameterDict2['cfg'] = cfgPath
tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName']))
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column3' topicFromStb = 'topic_stb_column3'
topicFromStb2 = 'topic_stb_column32' topicFromStb2 = 'topic_stb_column32'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName']))
tdSql.query("show topics") tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2)) tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromStb2: if topic1 != topicFromStb and topic1 != topicFromStb2:
tdLog.exit("topic error1") tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromStb2: if topic2 != topicFromStb and topic2 != topicFromStb2:
tdLog.exit("topic error2") tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -418,7 +418,7 @@ class TDTestCase: ...@@ -418,7 +418,7 @@ class TDTestCase:
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data") tdLog.info("check stb if there are data")
while 1: while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"]) tdSql.query("select count(*) from %s"%parameterDict["stbName"])
...@@ -429,17 +429,17 @@ class TDTestCase: ...@@ -429,17 +429,17 @@ class TDTestCase:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
# start the second thread to create new child table and insert data # start the second thread to create new child table and insert data
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
...@@ -448,7 +448,7 @@ class TDTestCase: ...@@ -448,7 +448,7 @@ class TDTestCase:
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
prepareEnvThread2.join() prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
...@@ -460,7 +460,7 @@ class TDTestCase: ...@@ -460,7 +460,7 @@ class TDTestCase:
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromStb2) tdSql.query("drop topic %s"%topicFromStb2)
...@@ -478,7 +478,7 @@ class TDTestCase: ...@@ -478,7 +478,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
#self.tmqCase1(cfgPath, buildPath) #self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath) #self.tmqCase2(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath)
def stop(self): def stop(self):
......
...@@ -55,15 +55,15 @@ class TDTestCase: ...@@ -55,15 +55,15 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
...@@ -76,8 +76,8 @@ class TDTestCase: ...@@ -76,8 +76,8 @@ class TDTestCase:
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tsql.execute(sql) tsql.execute(sql)
event.set() event.set()
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
...@@ -104,7 +104,7 @@ class TDTestCase: ...@@ -104,7 +104,7 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
...@@ -123,7 +123,7 @@ class TDTestCase: ...@@ -123,7 +123,7 @@ class TDTestCase:
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\ parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\ parameterDict["batchNum"],\
parameterDict["startTs"]) parameterDict["startTs"])
return return
def tmqCase1(self, cfgPath, buildPath): def tmqCase1(self, cfgPath, buildPath):
...@@ -144,12 +144,12 @@ class TDTestCase: ...@@ -144,12 +144,12 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -166,20 +166,20 @@ class TDTestCase: ...@@ -166,20 +166,20 @@ class TDTestCase:
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
valgrind = 1 valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from %s.consumeresult"%cdbName) tdSql.query("select * from %s.consumeresult"%cdbName)
...@@ -217,12 +217,12 @@ class TDTestCase: ...@@ -217,12 +217,12 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -239,25 +239,25 @@ class TDTestCase: ...@@ -239,25 +239,25 @@ class TDTestCase:
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
consumerId = 1 consumerId = 1
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
valgrind = 1 valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from %s.consumeresult"%cdbName) tdSql.query("select * from %s.consumeresult"%cdbName)
...@@ -317,9 +317,9 @@ class TDTestCase: ...@@ -317,9 +317,9 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
...@@ -336,25 +336,25 @@ class TDTestCase: ...@@ -336,25 +336,25 @@ class TDTestCase:
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
# consumerId = 1 # consumerId = 1
# sql = "insert into %s.consumeinfo values "%cdbName # sql = "insert into %s.consumeinfo values "%cdbName
# sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
# tdSql.query(sql) # tdSql.query(sql)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
valgrind = 1 valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
prepareEnvThread2.join() prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from %s.consumeresult"%cdbName) tdSql.query("select * from %s.consumeresult"%cdbName)
...@@ -392,7 +392,7 @@ class TDTestCase: ...@@ -392,7 +392,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath) self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath) #self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath) #self.tmqCase3(cfgPath, buildPath)
def stop(self): def stop(self):
......
...@@ -49,7 +49,7 @@ class TDTestCase: ...@@ -49,7 +49,7 @@ class TDTestCase:
print(cur) print(cur)
return cur return cur
def initConsumerTable(self,cdbName='cdb'): def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table") tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
...@@ -58,7 +58,7 @@ class TDTestCase: ...@@ -58,7 +58,7 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql) tdLog.info("consume info sql: %s"%sql)
...@@ -73,11 +73,11 @@ class TDTestCase: ...@@ -73,11 +73,11 @@ class TDTestCase:
break break
else: else:
time.sleep(5) time.sleep(5)
for i in range(expectRows): for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3)) resultList.append(tdSql.getData(i , 3))
return resultList return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
...@@ -86,15 +86,15 @@ class TDTestCase: ...@@ -86,15 +86,15 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log' logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
...@@ -107,8 +107,8 @@ class TDTestCase: ...@@ -107,8 +107,8 @@ class TDTestCase:
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tsql.execute(sql) tsql.execute(sql)
event.set() event.set()
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
...@@ -137,7 +137,7 @@ class TDTestCase: ...@@ -137,7 +137,7 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
...@@ -156,7 +156,7 @@ class TDTestCase: ...@@ -156,7 +156,7 @@ class TDTestCase:
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\ parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\ parameterDict["batchNum"],\
parameterDict["startTs"]) parameterDict["startTs"])
return return
def tmqCase1(self, cfgPath, buildPath): def tmqCase1(self, cfgPath, buildPath):
...@@ -179,10 +179,10 @@ class TDTestCase: ...@@ -179,10 +179,10 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
...@@ -194,7 +194,7 @@ class TDTestCase: ...@@ -194,7 +194,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\ auto.commit.interval.ms:6000,\
auto.offset.reset:earliest' auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
...@@ -205,14 +205,14 @@ class TDTestCase: ...@@ -205,14 +205,14 @@ class TDTestCase:
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = self.selectConsumeResult(expectRows) resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
...@@ -241,12 +241,12 @@ class TDTestCase: ...@@ -241,12 +241,12 @@ class TDTestCase:
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1 topicList = topicName1
...@@ -260,25 +260,25 @@ class TDTestCase: ...@@ -260,25 +260,25 @@ class TDTestCase:
consumerId = 1 consumerId = 1
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 2 expectRows = 2
resultList = self.selectConsumeResult(expectRows) resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
...@@ -323,9 +323,9 @@ class TDTestCase: ...@@ -323,9 +323,9 @@ class TDTestCase:
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
topicList = topicName1 topicList = topicName1
...@@ -336,10 +336,10 @@ class TDTestCase: ...@@ -336,10 +336,10 @@ class TDTestCase:
auto.commit.interval.ms:6000,\ auto.commit.interval.ms:6000,\
auto.offset.reset:earliest' auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# consumerId = 1 # consumerId = 1
# self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) # self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
...@@ -349,16 +349,16 @@ class TDTestCase: ...@@ -349,16 +349,16 @@ class TDTestCase:
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
prepareEnvThread2.join() prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = self.selectConsumeResult(expectRows) resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
...@@ -379,7 +379,7 @@ class TDTestCase: ...@@ -379,7 +379,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath) self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath) #self.tmqCase2(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath)
def stop(self): def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册