diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 4f70340b5a6df4daebcda5869117d9e0008d9a48..2757e590a3904dfcd584ae8e35c5e861dcbbb9db 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -188,8 +188,8 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") - + tdLog.printNoPrefix("======== test case 1: ") + self.initConsumerTable() # create and start thread @@ -205,13 +205,13 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -245,7 +245,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -255,8 +255,8 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: ") - + tdLog.printNoPrefix("======== test case 2: ") + self.initConsumerTable() # create and start thread @@ -292,7 +292,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -341,7 +341,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -362,7 +362,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmq3mnodeSwitch.py b/tests/system-test/7-tmq/tmq3mnodeSwitch.py index 2769c3867bbf152269b55fe57d183d97cfd7d58a..305a93128e2a15a53bbc46e2a53bfef8eb41e2c3 100644 --- a/tests/system-test/7-tmq/tmq3mnodeSwitch.py +++ b/tests/system-test/7-tmq/tmq3mnodeSwitch.py @@ -36,7 +36,7 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file - + def checkDnodesStatusAndCreateMnode(self,dnodeNumbers): count=0 while count < dnodeNumbers: @@ -44,7 +44,7 @@ class TDTestCase: # tdLog.debug(tdSql.queryResult) dCnt = 0 for i in range(dnodeNumbers): - if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready": + if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready": break else: dCnt += 1 @@ -64,7 +64,7 @@ class TDTestCase: while count < self.mnodeCheckCnt: time.sleep(1) tdSql.query("show mnodes;") - if tdSql.checkRows(self.mnodes) : + if tdSql.checkRows(self.mnodes) : tdLog.debug("mnode is three nodes") else: tdLog.exit("mnode number is correct") @@ -78,17 +78,17 @@ class TDTestCase: break elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' : self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex] - break + break elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' : self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] - break - else: + break + else: count+=1 else: tdLog.exit("three mnodes is not ready in 10s ") - tdSql.query("show mnodes;") - tdSql.checkRows(self.mnodes) + tdSql.query("show mnodes;") + tdSql.checkRows(self.mnodes) tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort)) tdSql.checkData(0,self.mnodeStatusIndex,'ready') tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep)) @@ -101,8 +101,8 @@ class TDTestCase: while count < self.mnodeCheckCnt: time.sleep(1) tdSql.query("show mnodes") - tdLog.debug(tdSql.queryResult) - # if tdSql.checkRows(self.mnodes) : + tdLog.debug(tdSql.queryResult) + # if tdSql.checkRows(self.mnodes) : # tdLog.debug("mnode is three nodes") # else: # tdLog.exit("mnode number is correct") @@ -117,21 +117,21 @@ class TDTestCase: break elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' : self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] - break + break elif roleOfMnode1=='offline' : if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' : self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex] break elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' : self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex] - break + break elif roleOfMnode2=='offline' : if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' : self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex] break elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' : self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex] - break + break count+=1 else: @@ -144,27 +144,27 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) else: break - return - + return + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db1', @@ -195,7 +195,7 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("async insert data") pThread = tmqCom.asyncInsertData(paraDict) - + tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) @@ -234,22 +234,22 @@ class TDTestCase: tdDnodes[1].stoptaosd() time.sleep(10) self.check3mnode1off() - - tdLog.info("switch end and wait insert data end ................") - pThread.join() - tdLog.info("check the consume result") - tdSql.query(queryString) + tdLog.info("switch end and wait insert data end ................") + pThread.join() + + tdLog.info("check the consume result") + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") - self.checkFileContent(consumerId, queryString) + self.checkFileContent(consumerId, queryString) time.sleep(10) for i in range(len(topicNameList)): diff --git a/tests/system-test/7-tmq/tmqAlterSchema.py b/tests/system-test/7-tmq/tmqAlterSchema.py index a2e20990d9ca43fe62db04c6072a805356f5d514..232a1e11faaf0474ebd12cb8fa83e15bc94bf6da 100644 --- a/tests/system-test/7-tmq/tmqAlterSchema.py +++ b/tests/system-test/7-tmq/tmqAlterSchema.py @@ -36,7 +36,7 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: topic: select * from stb, while consume, add column int-A/bianry-B/float-C, and then modify B, drop C") tdLog.printNoPrefix("add tag int-A/bianry-B/float-C, and then rename A, modify B, drop C, set t2") @@ -61,7 +61,7 @@ class TDTestCase: topicNameList = ['topic1'] expectRowsList = [] - queryStringList = [] + queryStringList = [] tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) tdLog.info("create stb") @@ -71,15 +71,15 @@ class TDTestCase: # tdLog.info("async insert data") # pThread = tmqCom.asyncInsertData(paraDict) tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"]) - + tdLog.info("create topics from stb with filter") queryStringList.append("select * from %s.%s" %(paraDict['dbName'], paraDict['stbName'])) sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0]) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryStringList[0]) - expectRowsList.append(tdSql.getRows()) - + tdSql.query(queryStringList[0]) + expectRowsList.append(tdSql.getRows()) + # init consume info, and start tmq_sim, then check consume result tdLog.info("insert consume info to consume processor") consumerId = 0 @@ -91,14 +91,14 @@ class TDTestCase: tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0]) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tdLog.info("wait the notify info of start consume, then alter schema") tmqCom.getStartConsumeNotifyFromTmqsim() - - # add column double-A/bianry-B/double-C, and then modify B, drop C + + # add column double-A/bianry-B/double-C, and then modify B, drop C sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],paraDict['stbName']) tdSql.execute(sqlString) sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],paraDict['stbName']) @@ -108,8 +108,8 @@ class TDTestCase: sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],paraDict['stbName']) tdSql.execute(sqlString) sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],paraDict['stbName']) - tdSql.execute(sqlString) - # add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1 + tdSql.execute(sqlString) + # add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1 sqlString = "alter table %s.%s add tag newt1 double"%(paraDict["dbName"],paraDict['stbName']) tdSql.execute(sqlString) sqlString = "alter table %s.%s add tag newt2 binary(16)"%(paraDict["dbName"],paraDict['stbName']) @@ -125,27 +125,27 @@ class TDTestCase: sqlString = "alter table %s.%s0 set tag newt2='new tag'"%(paraDict["dbName"],paraDict['ctbPrefix']) tdSql.execute(sqlString) - tdLog.info("check the consume result") - tdSql.query(queryStringList[0]) + tdLog.info("check the consume result") + tdSql.query(queryStringList[0]) expectRowsList.append(tdSql.getRows()) expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + tdLog.info("expect consume rows: %d"%(expectRowsList[0])) tdLog.info("act consume rows: %d"%(resultList[0])) - + if expectRowsList[0] != resultList[0]: tdLog.exit("0 tmq consume rows error!") tmqCom.checkTmqConsumeFileContent(consumerId, dstFile) - + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") - + def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: topic: select * from ntb, while consume, add column int-A/bianry-B/float-C, and then rename A, modify B, drop C") paraDict = {'dbName': 'db1', @@ -166,12 +166,12 @@ class TDTestCase: 'pollDelay': 10, 'showMsg': 1, 'showRow': 1} - - ntbName = 'ntb' + + ntbName = 'ntb' topicNameList = ['topic1'] expectRowsList = [] - queryStringList = [] + queryStringList = [] tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) tdLog.info("create stb") @@ -182,15 +182,15 @@ class TDTestCase: # pThread = tmqCom.asyncInsertData(paraDict) tdCom.insert_rows(tdSql, dbname=paraDict["dbName"], tbname=ntbName, column_ele_list=paraDict['colSchema'], start_ts_value=paraDict["startTs"], count=paraDict["rowsPerTbl"]) tdLog.info("insert data end") - + tdLog.info("create topics from ntb with filter") queryStringList.append("select * from %s.%s" %(paraDict['dbName'], ntbName)) sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0]) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryStringList[0]) - expectRowsList.append(tdSql.getRows()) - + tdSql.query(queryStringList[0]) + expectRowsList.append(tdSql.getRows()) + # init consume info, and start tmq_sim, then check consume result tdLog.info("insert consume info to consume processor") consumerId = 0 @@ -202,13 +202,13 @@ class TDTestCase: tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0]) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) tdLog.info("wait the notify info of start consume, then alter schema") tmqCom.getStartConsumeNotifyFromTmqsim() - + # add column double-A/bianry-B/double-C, and then rename A, modify B, drop C sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],ntbName) tdSql.execute(sqlString) @@ -223,21 +223,21 @@ class TDTestCase: sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],ntbName) tdSql.execute(sqlString) - tdLog.info("check the consume result") - tdSql.query(queryStringList[0]) + tdLog.info("check the consume result") + tdSql.query(queryStringList[0]) expectRowsList.append(tdSql.getRows()) expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + tdLog.info("expect consume rows: %d"%(expectRowsList[0])) tdLog.info("act consume rows: %d"%(resultList[0])) - + if expectRowsList[0] != resultList[0]: tdLog.exit("0 tmq consume rows error!") tmqCom.checkTmqConsumeFileContent(consumerId, dstFile) - + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index 277fdf7afb226a49de00312b3240c70106ae4a4d..a613f11267ef769ef9162543a5818a0881f23abd 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 500 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,10 +62,10 @@ class TDTestCase: # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctbx",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -96,7 +96,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdLog.info("create stb") @@ -105,12 +105,12 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -134,9 +134,9 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) - if totalConsumeRows != totalRowsFromQuery: + if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -144,7 +144,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -169,7 +169,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdLog.info("create stb") @@ -182,13 +182,13 @@ class TDTestCase: # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' # queryString = "select ts, c1, c2 from %s.%s "%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' and t5 == 'shanghai' "%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 1 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 @@ -205,7 +205,7 @@ class TDTestCase: tdLog.info("create some new child table and insert data ") tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctby",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -215,9 +215,9 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) - if totalConsumeRows != totalRowsFromQuery: + if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -231,14 +231,14 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqCheckData.py b/tests/system-test/7-tmq/tmqCheckData.py index 0e55dfa19d33a14703b9d985446bf532537b4925..9338debfa6f5f9161cfa700dd2c3fdbdaa2fa8b7 100644 --- a/tests/system-test/7-tmq/tmqCheckData.py +++ b/tests/system-test/7-tmq/tmqCheckData.py @@ -27,26 +27,26 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) else: break - return + return def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -78,13 +78,13 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -100,15 +100,15 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") - self.checkFileContent(consumerId, queryString) + self.checkFileContent(consumerId, queryString) # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() @@ -117,7 +117,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 1 @@ -127,7 +127,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -143,8 +143,8 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[2], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) consumerId = 2 topicList = topicNameList[2] @@ -153,7 +153,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) # if expectRowsList[2] != resultList[0]: @@ -162,7 +162,7 @@ class TDTestCase: # self.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqCheckData1.py b/tests/system-test/7-tmq/tmqCheckData1.py index 6cf849d1b989a9b99ea19a7fa77b9483254bd7f8..7c236bbe8bbfb6f0a1c66f3c7e1340081304a9f5 100644 --- a/tests/system-test/7-tmq/tmqCheckData1.py +++ b/tests/system-test/7-tmq/tmqCheckData1.py @@ -27,26 +27,26 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) else: break - return + return def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -78,13 +78,13 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts,c1,c2 from %s.%s" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as stable %s.%s" %(topicNameList[0], paraDict["dbName"],paraDict["stbName"]) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -100,15 +100,15 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") - self.checkFileContent(consumerId, queryString) + self.checkFileContent(consumerId, queryString) # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() @@ -116,7 +116,7 @@ class TDTestCase: sqlString = "create topic %s as database %s" %(topicNameList[1], paraDict['dbName']) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 1 @@ -126,7 +126,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -141,7 +141,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[2], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 2 @@ -151,7 +151,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[2] != resultList[0]: @@ -160,7 +160,7 @@ class TDTestCase: self.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 20cffed486a5ad592c92a4b1e259170764f46c03..b1455ebe48d03e314a5421fbb8fa3bd4d41ab9d9 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -41,23 +41,23 @@ class TMQCom: tdSql.init(conn.cursor()) # tdSql.init(conn.cursor(), logSql) # output sql.txt file - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) - tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) + tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -72,13 +72,13 @@ class TMQCom: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList - + def selectConsumeMsgResult(self,expectRows,cdbName='cdb'): resultList=[] while 1: @@ -88,11 +88,11 @@ class TMQCom: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 2)) - + return resultList def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0,snapshot=0): @@ -102,7 +102,7 @@ class TMQCom: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): processorName = buildPath + '\\build\\bin\\tmq_sim.exe' if alias != 0: @@ -111,8 +111,8 @@ class TMQCom: os.system(shellCmd) processorName = processorNameNew shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot) + shellCmd += "> nul 2>&1 &" else: processorName = buildPath + '/build/bin/tmq_sim' if alias != 0: @@ -121,10 +121,10 @@ class TMQCom: os.system(shellCmd) processorName = processorNameNew shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot) + shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) def stopTmqSimProcess(self, processorName): psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName) @@ -149,7 +149,7 @@ class TMQCom: for i in range(actRows): if tdSql.getData(i, 1) == 0: loopFlag = 0 - break + break time.sleep(0.1) return @@ -163,7 +163,7 @@ class TMQCom: for i in range(actRows): if tdSql.getData(i, 1) == 1: loopFlag = 0 - break + break time.sleep(0.1) return @@ -196,7 +196,7 @@ class TMQCom: tagBinaryValue = 'shanghai' elif (i % 3 == 0): tagBinaryValue = 'changsha' - + sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) tblBatched += 1 if (i == ctbNum-1 ) or (tblBatched == batchNum): @@ -206,9 +206,9 @@ class TMQCom: if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) - return + return def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0): for _ in range(count): @@ -246,7 +246,7 @@ class TMQCom: #print("insert sql:%s"%sql) tsql.execute(sql) tdLog.debug("insert data ............ [OK]") - return + return # schema: (ts timestamp, c1 int, c2 int, c3 binary(16)) def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs): @@ -373,16 +373,16 @@ class TMQCom: if startTs == 0: t = time.time() startTs = int(round(t * 1000)) - + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsBatched = 0 + rowsBatched = 0 for i in range(ctbNum): tagBinaryValue = 'beijing' if (i % 2 == 0): tagBinaryValue = 'shanghai' elif (i % 3 == 0): tagBinaryValue = 'changsha' - + sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) for j in range(rowsPerTbl): sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched) @@ -413,7 +413,7 @@ class TMQCom: for i in range(ctbNum): tbName = '%s%s'%(ctbPrefix,i) tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl']) - return + return def threadFunction(self, **paraDict): # create new connector for new tdSql instance in my thread @@ -447,20 +447,20 @@ class TMQCom: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() - + # skip offset for consumer for i in range(0,skipRowsOfCons): - consumeFile.readline() - + consumeFile.readline() + lines = 0 while True: dst = queryFile.readline() @@ -473,7 +473,7 @@ class TMQCom: tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines)) else: break - return + return def getResultFileByTaosShell(self, consumerId, queryString): buildPath = tdCom.getBuildPath() @@ -483,15 +483,15 @@ class TMQCom: tdLog.info(cmdStr) os.system(cmdStr) return dstFile - - def checkTmqConsumeFileContent(self, consumerId, dstFile): - cfgPath = tdCom.getClientCfgPath() + + def checkTmqConsumeFileContent(self, consumerId, dstFile): + cfgPath = tdCom.getClientCfgPath() consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() lines = 0 @@ -506,7 +506,7 @@ class TMQCom: tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines)) else: break - return + return def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs): tb_params = "" @@ -538,7 +538,7 @@ class TMQCom: column_value_str = column_value_str.rstrip()[:-1] insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});' tsql.execute(insert_sql) - + def waitSubscriptionExit(self, tsql, topicName): wait_cnt = 0 while True: @@ -548,7 +548,7 @@ class TMQCom: for idx in range (rows): if tsql.getData(idx, 0) != topicName: continue - + if tsql.getData(idx, 3) == None: continue else: @@ -556,10 +556,10 @@ class TMQCom: wait_cnt += 1 exit_flag = 0 break - + if exit_flag == 1: break - + tsql.query("show subscriptions") tdLog.info("show subscriptions:") tdLog.info(tsql.queryResult)