From 54cf2f7c275e4b864c33be2218d85589cf907c70 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 Aug 2022 16:21:31 +0800 Subject: [PATCH] fix test cases --- tests/system-test/99-TDcase/TD-15517.py | 92 +++++++++--------- tests/system-test/99-TDcase/TD-15554.py | 122 ++++++++++++------------ tests/system-test/99-TDcase/TD-15557.py | 58 +++++------ tests/system-test/99-TDcase/TD-15563.py | 62 ++++++------ 4 files changed, 167 insertions(+), 167 deletions(-) diff --git a/tests/system-test/99-TDcase/TD-15517.py b/tests/system-test/99-TDcase/TD-15517.py index ebab6617c2..99ca02aab7 100644 --- a/tests/system-test/99-TDcase/TD-15517.py +++ b/tests/system-test/99-TDcase/TD-15517.py @@ -50,7 +50,7 @@ class TDTestCase: return cur def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -63,7 +63,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -90,7 +90,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -109,9 +109,9 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) - return - + parameterDict["startTs"]) + return + def run(self): tdSql.prepare() @@ -138,22 +138,22 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() time.sleep(2) - + # wait stb ready while 1: - tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: break else: time.sleep(1) tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column' - topicFromCtb = 'topic_ctb_column' - + topicFromCtb = 'topic_ctb_column' + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") #tdSql.checkRows(2) @@ -161,15 +161,15 @@ class TDTestCase: topic2 = tdSql.getData(1 , 0) print (topic1) print (topic2) - + print (topicFromStb) print (topicFromCtb) #tdLog.info("show topics: %s, %s"%topic1, topic2) #if topic1 != topicFromStb or topic1 != topicFromCtb: - # tdLog.exit("topic error1") + # tdLog.exit("topic error1") #if topic2 != topicFromStb or topic2 != topicFromCtb: - # tdLog.exit("topic error2") - + # tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") @@ -187,7 +187,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -198,21 +198,21 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -223,17 +223,17 @@ class TDTestCase: time.sleep(5) expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - + tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromCtb) - + # ============================================================================== tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ") - tdLog.info(" clean database") + tdLog.info(" clean database") # create and start thread parameterDict = {'cfg': '', \ 'dbName': 'db2', \ @@ -247,21 +247,21 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + # wait db ready while 1: tdSql.query("show databases") - if tdSql.getRows() == 4: - print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) break else: time.sleep(1) - + tdSql.query("use %s"%parameterDict['dbName']) # wait stb ready while 1: tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + if tdSql.getRows() == 1: break else: time.sleep(1) @@ -269,25 +269,25 @@ class TDTestCase: tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column2' topicFromCtb = 'topic_ctb_column2' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) print (topic1) print (topic2) - + print (topicFromStb) print (topicFromCtb) #tdLog.info("show topics: %s, %s"%topic1, topic2) #if topic1 != topicFromStb or topic1 != topicFromCtb: - # tdLog.exit("topic error1") + # tdLog.exit("topic error1") #if topic2 != topicFromStb or topic2 != topicFromCtb: - # tdLog.exit("topic error2") - + # tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -305,7 +305,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -316,17 +316,17 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # create new child table and insert data newCtbName = 'newctb' @@ -340,7 +340,7 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -352,12 +352,12 @@ class TDTestCase: expectmsgcnt += rowsOfNewCtb expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb - + tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) - + # ============================================================================== tdLog.printNoPrefix("======== test scenario 3: ") diff --git a/tests/system-test/99-TDcase/TD-15554.py b/tests/system-test/99-TDcase/TD-15554.py index d97dd8c187..4012696dc7 100644 --- a/tests/system-test/99-TDcase/TD-15554.py +++ b/tests/system-test/99-TDcase/TD-15554.py @@ -49,7 +49,7 @@ class TDTestCase: return cur def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -62,7 +62,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -89,7 +89,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -108,7 +108,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return @@ -128,34 +128,34 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() time.sleep(2) - + # wait stb ready while 1: - tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: break else: time.sleep(1) tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column' - topicFromCtb = 'topic_ctb_column' - + topicFromCtb = 'topic_ctb_column' + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") #tdSql.checkRows(2) topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) - + tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromCtb: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromCtb: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") @@ -172,7 +172,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -183,21 +183,21 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -217,7 +217,7 @@ class TDTestCase: tdSql.query("drop topic %s"%topicFromCtb) tdLog.printNoPrefix("======== test case 1 end ...... ") - + def tmqCase2(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 2: add child table with consuming ") # create and start thread @@ -233,21 +233,21 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + # wait db ready while 1: tdSql.query("show databases") - if tdSql.getRows() == 4: - print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) break else: time.sleep(1) - + tdSql.query("use %s"%parameterDict['dbName']) # wait stb ready while 1: tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + if tdSql.getRows() == 1: break else: time.sleep(1) @@ -255,20 +255,20 @@ class TDTestCase: tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column2' topicFromCtb = 'topic_ctb_column2' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromCtb: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromCtb: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -286,7 +286,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -297,17 +297,17 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # create new child table and insert data newCtbName = 'newctb' @@ -320,7 +320,7 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -332,7 +332,7 @@ class TDTestCase: tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 3, expectrowcnt) - + tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromCtb) @@ -355,25 +355,25 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + # wait db ready while 1: tdSql.query("show databases") - if tdSql.getRows() == 4: - print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) break else: time.sleep(1) - + tdSql.query("use %s"%parameterDict['dbName']) # wait stb ready while 1: tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + if tdSql.getRows() == 1: break else: time.sleep(1) - + tdLog.info("create stable2 for the seconde topic") parameterDict2 = {'cfg': '', \ 'dbName': 'db3', \ @@ -385,23 +385,23 @@ class TDTestCase: 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict2['cfg'] = cfgPath tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) - + tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column3' topicFromStb2 = 'topic_stb_column32' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) - + tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromStb2: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromStb2: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -418,7 +418,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -429,17 +429,17 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # start the second thread to create new child table and insert data prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) @@ -448,7 +448,7 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -460,7 +460,7 @@ class TDTestCase: tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 3, expectrowcnt) - + tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb2) @@ -478,7 +478,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) #self.tmqCase1(cfgPath, buildPath) - #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) def stop(self): diff --git a/tests/system-test/99-TDcase/TD-15557.py b/tests/system-test/99-TDcase/TD-15557.py index 7a282e3176..243a0a4d7e 100644 --- a/tests/system-test/99-TDcase/TD-15557.py +++ b/tests/system-test/99-TDcase/TD-15557.py @@ -55,15 +55,15 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -76,8 +76,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -104,7 +104,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -123,7 +123,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase1(self, cfgPath, buildPath): @@ -144,12 +144,12 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -166,20 +166,20 @@ class TDTestCase: sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + event.wait() tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + valgrind = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from %s.consumeresult"%cdbName) @@ -217,12 +217,12 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -239,25 +239,25 @@ class TDTestCase: sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + consumerId = 1 sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + event.wait() tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 - + valgrind = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from %s.consumeresult"%cdbName) @@ -317,9 +317,9 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -336,25 +336,25 @@ class TDTestCase: sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + # consumerId = 1 # sql = "insert into %s.consumeinfo values "%cdbName # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) # tdSql.query(sql) - + event.wait() tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 - showRow = 1 + showRow = 1 valgrind = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from %s.consumeresult"%cdbName) @@ -392,7 +392,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) #self.tmqCase3(cfgPath, buildPath) def stop(self): diff --git a/tests/system-test/99-TDcase/TD-15563.py b/tests/system-test/99-TDcase/TD-15563.py index ca182820d5..74b5472d89 100644 --- a/tests/system-test/99-TDcase/TD-15563.py +++ b/tests/system-test/99-TDcase/TD-15563.py @@ -49,7 +49,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -58,7 +58,7 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -73,11 +73,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -86,15 +86,15 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -107,8 +107,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -137,7 +137,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -156,7 +156,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase1(self, cfgPath, buildPath): @@ -179,10 +179,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -194,7 +194,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -205,14 +205,14 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -241,12 +241,12 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 @@ -260,25 +260,25 @@ class TDTestCase: consumerId = 1 self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 - showRow = 1 + showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -323,9 +323,9 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 @@ -336,10 +336,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + # consumerId = 1 # self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -349,16 +349,16 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -379,7 +379,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) def stop(self): -- GitLab