diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index b2c569e31e3df633369af4331ec7b3b7cf392672..ba46f72695fb607ba14f48821db37cce1da1d441 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -49,7 +49,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -58,7 +58,7 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -73,11 +73,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -85,20 +85,20 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -111,8 +111,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -141,7 +141,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -159,7 +159,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase1(self, cfgPath, buildPath): @@ -182,10 +182,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -197,7 +197,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -208,14 +208,14 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -226,7 +226,7 @@ class TDTestCase: self.initConsumerTable() tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -238,7 +238,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor") pollDelay = 20 showMsg = 1 @@ -250,7 +250,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -279,12 +279,12 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 @@ -298,25 +298,25 @@ class TDTestCase: consumerId = 1 self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") pollDelay = 20 showMsg = 1 - showRow = 1 + showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if not (totalConsumeRows >= expectrowcnt): tdLog.exit("tmq consume rows error!") @@ -343,12 +343,12 @@ class TDTestCase: tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName'])) - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 @@ -366,11 +366,11 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor") pollDelay = 100 showMsg = 1 - showRow = 1 + showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -378,14 +378,14 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt * 2: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) tdLog.exit("tmq consume rows error!") @@ -430,9 +430,9 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 @@ -443,10 +443,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + # consumerId = 1 # self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -456,16 +456,16 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -486,10 +486,10 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) - self.tmqCase2a(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) + self.tmqCase2a(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) - + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") diff --git a/tests/system-test/7-tmq/subscribeDb2.py b/tests/system-test/7-tmq/subscribeDb2.py index 78aaa2634ca4cf24ed5a5c6790c4e6c786917c1b..4702aef035081abcf51d619fda2a26f666174d1b 100644 --- a/tests/system-test/7-tmq/subscribeDb2.py +++ b/tests/system-test/7-tmq/subscribeDb2.py @@ -50,7 +50,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -59,7 +59,7 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -73,12 +73,12 @@ class TDTestCase: if tdSql.getRows() == expectRows: break else: - time.sleep(5) + time.sleep(5) for i in range(expectRows): tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -89,17 +89,17 @@ class TDTestCase: if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -112,8 +112,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -146,7 +146,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -165,7 +165,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase8(self, cfgPath, buildPath): @@ -188,10 +188,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = math.ceil(parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2) @@ -203,7 +203,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -214,19 +214,19 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if not (totalConsumeRows >= expectrowcnt): tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") - + tdLog.info("again start consume processer") self.initConsumerTable() expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -237,7 +237,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -266,10 +266,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = math.ceil(parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2) @@ -281,7 +281,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -292,14 +292,14 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName'])) countOfStb = tdSql.getData(0,0) print ("====total rows of stb: %d"%countOfStb) @@ -307,7 +307,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if totalConsumeRows < expectrowcnt: tdLog.exit("tmq consume rows error!") - + tdLog.info("again start consume processer") self.initConsumerTable() expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -340,7 +340,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase8(cfgPath, buildPath) - self.tmqCase9(cfgPath, buildPath) + self.tmqCase9(cfgPath, buildPath) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index b576a0ea700c094c64b63a065056a00fda47e59e..e8e475456cfa56240e4c5df5654b5e68eca9324d 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -49,18 +49,18 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) - tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) + tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -75,7 +75,7 @@ class TDTestCase: else: time.sleep(0.1) return - + def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'): while 1: tdSql.query("select * from %s.notifyinfo"%cdbName) @@ -95,12 +95,12 @@ class TDTestCase: if tdSql.getRows() == expectRows: break else: - time.sleep(1) + time.sleep(1) for i in range(expectRows): tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -111,17 +111,17 @@ class TDTestCase: if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -134,8 +134,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -164,7 +164,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -183,7 +183,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase10(self, cfgPath, buildPath): @@ -206,10 +206,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -221,7 +221,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -242,18 +242,18 @@ class TDTestCase: resultList = self.selectConsumeResult(expectRows) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() tdLog.info("insert process end, and start to check consume result") tdLog.info("again start consume processer") self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - + expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -283,10 +283,10 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - + tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -298,7 +298,7 @@ class TDTestCase: auto.commit.interval.ms:1000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -307,7 +307,7 @@ class TDTestCase: showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - # time.sleep(6) + # time.sleep(6) tdLog.info("start to wait commit notify") self.getStartCommitNotifyFromTmqsim() @@ -320,18 +320,18 @@ class TDTestCase: # resultList = self.selectConsumeResult(expectRows) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() tdLog.info("insert process end, and start to check consume result") tdLog.info("again start consume processer") self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - + expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index 145cbbbbf59604a4b828d3290df0bcfdb129038c..63a59c0dc336b6b26a1e3ffc17806f65be1a013a 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -47,7 +47,7 @@ class TDTestCase: pollDelay = 20 showMsg = 1 - showRow = 1 + showRow = 1 hostname = socket.gethostname() @@ -59,7 +59,7 @@ class TDTestCase: def tmqCase12(self): tdLog.printNoPrefix("======== test case 12: ") tdLog.info("step 1: create database, stb, ctb and insert data") - + tmqCom.initConsumerTable(self.cdbName) tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"]) @@ -76,20 +76,20 @@ class TDTestCase: tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) tdLog.info("create topics from db") - topicName1 = 'topic_%s'%(self.paraDict['dbName']) + topicName1 = 'topic_%s'%(self.paraDict['dbName']) tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName'])) - + topicList = topicName1 keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset) self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2 tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) - - tdLog.info("start consume processor") + + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) tdLog.info("After waiting for a period of time, drop one stable") - time.sleep(3) - tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) + time.sleep(3) + tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) tdLog.info("wait result from consumer, then check it") expectRows = 1 @@ -98,7 +98,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt): tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb0.py b/tests/system-test/7-tmq/subscribeStb0.py index 65eaab897deb41d6d46f040be89b7f338a719fb1..26e834ae2c4ce898ee57971a8eb0066d0ad24838 100644 --- a/tests/system-test/7-tmq/subscribeStb0.py +++ b/tests/system-test/7-tmq/subscribeStb0.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -188,8 +188,8 @@ class TDTestCase: return def tmqCase3(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 3: ") - + tdLog.printNoPrefix("======== test case 3: ") + self.initConsumerTable() # create and start thread @@ -213,7 +213,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -231,7 +231,7 @@ class TDTestCase: showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - + time.sleep(1.5) tdLog.info("drop som child table of stb1") dropTblNum = 4 @@ -246,9 +246,9 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + remaindrowcnt = parameterDict["rowsPerTbl"] * (parameterDict["ctbNum"] - dropTblNum) - + tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, remaindrowcnt, expectrowcnt)) if not (totalConsumeRows <= expectrowcnt and totalConsumeRows >= remaindrowcnt): tdLog.exit("tmq consume rows error!") @@ -259,7 +259,7 @@ class TDTestCase: def tmqCase4(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 4: ") - + self.initConsumerTable() # create and start thread @@ -275,7 +275,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -288,7 +288,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -313,7 +313,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -331,7 +331,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -342,7 +342,7 @@ class TDTestCase: def tmqCase5(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 5: ") - + self.initConsumerTable() # create and start thread @@ -358,7 +358,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -371,7 +371,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -396,7 +396,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -414,14 +414,14 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != (expectrowcnt * (1 + 1/4)): tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) - tdLog.printNoPrefix("======== test case 5 end ...... ") + tdLog.printNoPrefix("======== test case 5 end ...... ") def run(self): tdSql.prepare() diff --git a/tests/system-test/7-tmq/subscribeStb1.py b/tests/system-test/7-tmq/subscribeStb1.py index 90d77dba0d66b2929877f656c57e160172e11908..0c49636b158b9b3dc4c810bf4bfd10fac0cc4473 100644 --- a/tests/system-test/7-tmq/subscribeStb1.py +++ b/tests/system-test/7-tmq/subscribeStb1.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -189,7 +189,7 @@ class TDTestCase: def tmqCase6(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 6: ") - + self.initConsumerTable() # create and start thread @@ -205,7 +205,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -243,7 +243,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -265,7 +265,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -276,7 +276,7 @@ class TDTestCase: def tmqCase7(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 7: ") - + self.initConsumerTable() # create and start thread @@ -292,7 +292,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -305,7 +305,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -330,7 +330,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -348,7 +348,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb2.py b/tests/system-test/7-tmq/subscribeStb2.py index 74caa139f147b088904ec542aeb85c54b60fa779..2cbb16ec0027930c7f02ae7fc50a69117fb77f9f 100644 --- a/tests/system-test/7-tmq/subscribeStb2.py +++ b/tests/system-test/7-tmq/subscribeStb2.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -188,8 +188,8 @@ class TDTestCase: return def tmqCase8(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 8: ") - + tdLog.printNoPrefix("======== test case 8: ") + self.initConsumerTable() # create and start thread @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -243,7 +243,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -263,7 +263,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -283,7 +283,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt*2: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) tdLog.exit("tmq consume rows error!") @@ -293,8 +293,8 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 8 end ...... ") def tmqCase9(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 9: ") - + tdLog.printNoPrefix("======== test case 9: ") + self.initConsumerTable() # create and start thread @@ -323,7 +323,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -348,7 +348,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -372,7 +372,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -392,7 +392,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt*2: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) tdLog.exit("tmq consume rows error!")