diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal.py b/tests/system-test/7-tmq/dataFromTsdbNWal.py index faa70f482077d66cff9861eaf313a0f3698f5c96..950c8fdcf6218fb6a0cc54573009bef41ffc0ffd 100644 --- a/tests/system-test/7-tmq/dataFromTsdbNWal.py +++ b/tests/system-test/7-tmq/dataFromTsdbNWal.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -58,12 +58,12 @@ class TDTestCase: tdLog.info("create ctb") tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("insert data") tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("flush db to let data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - # tdSql.query(queryString) + # tdSql.query(queryString) # expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -121,29 +121,29 @@ class TDTestCase: paraDict['batchNum'] = 100 paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) pInsertThread.join() - - tdSql.query(queryString) + + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) - - tdLog.info("wait the consume result") + + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) if expectRowsList[0] != resultList[0]: tdLog.exit("%d tmq consume rows error!"%consumerId) - tmqCom.checkFileContent(consumerId, queryString) + tmqCom.checkFileContent(consumerId, queryString) tdSql.query("flush database %s"%(paraDict['dbName'])) for i in range(len(topicNameList)): - tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -173,18 +173,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -200,36 +200,36 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeRows = resultList[0] - + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows): tdLog.exit("%d tmq consume rows error!"%consumerId) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeRows = resultList[0] - tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)): tdLog.exit("%d tmq consume rows error!"%consumerId) for i in range(len(topicNameList)): - tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 2 end ...... ") diff --git a/tests/system-test/7-tmq/db.py b/tests/system-test/7-tmq/db.py index 1fd0638d172fe6dfb6f19c1617e1eebc38ccac41..da5d7fefd2b2ca2f06ceaf81d6ceb116e8bbdfcd 100644 --- a/tests/system-test/7-tmq/db.py +++ b/tests/system-test/7-tmq/db.py @@ -56,12 +56,12 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("drop database if exists %s "%(cdbName)) tdSql.query("create database %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) - tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) @@ -75,7 +75,7 @@ class TDTestCase: tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -90,11 +90,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -102,14 +102,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -139,7 +139,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -158,7 +158,7 @@ class TDTestCase: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: for i in range(ctbNum): sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) @@ -185,7 +185,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(ctbPrefix,i) for j in range(rowsPerTbl): @@ -216,7 +216,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) for j in range(rowsPerTbl): @@ -235,8 +235,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -255,7 +255,7 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") + tdLog.printNoPrefix("======== test case 1: ") ''' subscribe one db, multi normal table which have not same schema, and include rows of all tables in one insert sql ''' @@ -274,11 +274,11 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) tdSql.execute("create table %s.ntb0 (ts timestamp, c1 int)"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb1 (ts timestamp, c1 int, c2 float)"%(parameterDict["dbName"])) - tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(parameterDict["dbName"])) + tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(parameterDict["dbName"])) tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(parameterDict["dbName"])) tdSql.execute("insert into %s.ntb0 values(now, 1) %s.ntb1 values(now, 1, 1) %s.ntb2 values(now, 1, 1, '1') %s.ntb3 values(now, 1, 1, '1', now)"%(parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"],parameterDict["dbName"])) @@ -301,7 +301,7 @@ class TDTestCase: tdLog.info("create topics from db") topicFromDb = 'topic_db_mulit_tbl' - + tdSql.execute("create topic %s as database %s" %(topicFromDb, parameterDict['dbName'])) consumerId = 0 expectrowcnt = numOfNtb * rowsOfPerNtb @@ -324,7 +324,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -334,7 +334,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") ''' subscribe one stb, multi child talbe and normal table which have not same schema, and include rows of all tables in one insert sql ''' @@ -355,7 +355,7 @@ class TDTestCase: parameterDict['cfg'] = cfgPath dbName = parameterDict["dbName"] - + self.create_database(tdSql, dbName) tdSql.execute("create stable %s.stb (ts timestamp, s1 bigint, s2 binary(32), s3 double) tags (t1 int, t2 binary(32))"%(dbName)) @@ -364,7 +364,7 @@ class TDTestCase: tdSql.execute("create table %s.ntb0 (ts timestamp, c1 binary(32))"%(dbName)) tdSql.execute("create table %s.ntb1 (ts timestamp, c1 binary(32), c2 float)"%(dbName)) - tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(dbName)) + tdSql.execute("create table %s.ntb2 (ts timestamp, c1 int, c2 float, c3 binary(32))"%(dbName)) tdSql.execute("create table %s.ntb3 (ts timestamp, c1 int, c2 float, c3 binary(32), c4 timestamp)"%(dbName)) tdSql.execute("insert into %s.ntb0 values(now, 'ntb0-11') \ @@ -401,7 +401,7 @@ class TDTestCase: tdLog.info("create topics from db") topicFromStb = 'topic_stb_mulit_tbl' - + tdSql.execute("create topic %s as stable %s.stb" %(topicFromStb, dbName)) consumerId = 0 expectrowcnt = numOfCtb * rowsOfPerNtb @@ -424,7 +424,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -445,7 +445,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/dropDbR3ConflictTransaction.py b/tests/system-test/7-tmq/dropDbR3ConflictTransaction.py index 4dac872fdea51f284c3edde04db7416721a70936..fc4fdcecf9f31dc341f1a26d49b4d7efda5deae3 100644 --- a/tests/system-test/7-tmq/dropDbR3ConflictTransaction.py +++ b/tests/system-test/7-tmq/dropDbR3ConflictTransaction.py @@ -38,20 +38,20 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) @@ -84,7 +84,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica) tdLog.info("create stb") @@ -101,13 +101,13 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.asyncInsertDataByInterlace(paraDict) tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1) - tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") + tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") tdSql.query("drop database %s"%paraDict["dbName"]) return - def tmqCase1(self): - tdLog.printNoPrefix("======== test case 1: ") - + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + # create and start thread paraDict = {'dbName': 'dbt', 'dropFlag': 1, @@ -132,14 +132,14 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -166,13 +166,13 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - + if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1) diff --git a/tests/system-test/7-tmq/schema.py b/tests/system-test/7-tmq/schema.py index 699c252c3181f2988b00da7a609a7b006f2a7ee5..34d36e57927ca8605b11deeac7a1c4165bbf9905 100644 --- a/tests/system-test/7-tmq/schema.py +++ b/tests/system-test/7-tmq/schema.py @@ -56,12 +56,12 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("drop database if exists %s "%(cdbName)) tdSql.query("create database %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) - tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) @@ -75,7 +75,7 @@ class TDTestCase: tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -90,11 +90,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -103,9 +103,9 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -135,7 +135,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -154,7 +154,7 @@ class TDTestCase: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: for i in range(ctbNum): sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) @@ -181,7 +181,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(ctbPrefix,i) for j in range(rowsPerTbl): @@ -212,7 +212,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) for j in range(rowsPerTbl): @@ -231,8 +231,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -265,7 +265,7 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + tdLog.info("create database, super table, child table, normal table") ntbName = 'ntb1' self.create_database(tdSql, parameterDict["dbName"]) @@ -278,10 +278,10 @@ class TDTestCase: tdLog.info("create topics from super table and normal table") columnTopicFromStb = 'column_topic_from_stb1' columnTopicFromNtb = 'column_topic_from_ntb1' - + tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(columnTopicFromNtb, parameterDict['dbName'], ntbName)) - + tdLog.info("======== super table test:") # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -341,12 +341,12 @@ class TDTestCase: tdLog.info("======== child table test:") parameterDict['stbName'] = 'stb12' ctbName = 'stb12_0' - tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) + tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) tdSql.query("create table %s.%s using %s.%s tags (1, '2', 3, '4', '5')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName'])) tdLog.info("create topics from child table") columnTopicFromCtb = 'column_topic_from_ctb1' - + tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s" %(columnTopicFromCtb,parameterDict['dbName'],ctbName)) # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic @@ -388,7 +388,7 @@ class TDTestCase: tdSql.query("alter table %s.%s add column c4 float"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s add tag t3 int"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s add tag t4 float"%(parameterDict['dbName'], parameterDict['stbName'])) - + tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): @@ -406,7 +406,7 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + tdLog.info("create database, super table, child table, normal table") self.create_database(tdSql, parameterDict["dbName"]) ntbName = 'ntb2' @@ -416,18 +416,18 @@ class TDTestCase: tdLog.info("create topics from super table and normal table") columnTopicFromStb = 'column_topic_from_stb2' columnTopicFromNtb = 'column_topic_from_ntb2' - + tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s where c3 > 3 and c4 like 'abc' and t3 = 5 and t4 = 'beijing'" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s where c3 > 3 and c4 like 'abc'" %(columnTopicFromNtb, parameterDict['dbName'], ntbName)) - + tdLog.info("======== super table test:") # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.error("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.error("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t1"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.error("alter table %s.%s drop tag t2"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.error("alter table %s.%s drop tag t2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t3"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t4"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -485,12 +485,12 @@ class TDTestCase: tdLog.info("======== child table test:") parameterDict['stbName'] = 'stb21' ctbName = 'stb21_0' - tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) + tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) tdSql.query("create table %s.%s using %s.%s tags (1, '2', 3, '4', '5')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName'])) tdLog.info("create topics from child table") columnTopicFromCtb = 'column_topic_from_ctb2' - + tdSql.execute("create topic %s as select ts, c1, c2, t1, t2 from %s.%s where c3 > 3 and c4 like 'abc' and t3 = 5 and t4 = 'beijing'" %(columnTopicFromCtb,parameterDict['dbName'],ctbName)) # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic @@ -536,11 +536,11 @@ class TDTestCase: tdSql.query("alter table %s.%s add column c5 float"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s add tag t5 float"%(parameterDict['dbName'], parameterDict['stbName'])) - + tdLog.printNoPrefix("======== test case 2 end ...... ") def tmqCase3(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 3: ") + tdLog.printNoPrefix("======== test case 3: ") parameterDict = {'cfg': '', \ 'actionType': 0, \ 'dbName': 'db3', \ @@ -554,7 +554,7 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + tdLog.info("create database, super table, child table, normal table") self.create_database(tdSql, parameterDict["dbName"]) ntbName = 'ntb3' @@ -564,19 +564,19 @@ class TDTestCase: tdLog.info("create topics from super table and normal table") columnTopicFromStb = 'star_topic_from_stb3' columnTopicFromNtb = 'star_topic_from_ntb3' - + tdSql.execute("create topic %s as select * from %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select * from %s.%s " %(columnTopicFromNtb, parameterDict['dbName'], ntbName)) - + tdLog.info("======== super table test:") # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.error("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.error("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.error("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.error("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t1"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.error("alter table %s.%s drop tag t2"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.error("alter table %s.%s drop tag t2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t3"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t4"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop tag t5"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -627,12 +627,12 @@ class TDTestCase: tdLog.info("======== child table test:") parameterDict['stbName'] = 'stb31' ctbName = 'stb31_0' - tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) + tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) tdSql.query("create table %s.%s using %s.%s tags (10, '10', 10, '10', '10')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName'])) tdLog.info("create topics from child table") columnTopicFromCtb = 'column_topic_from_ctb3' - + tdSql.execute("create topic %s as select * from %s.%s " %(columnTopicFromCtb,parameterDict['dbName'],ctbName)) tdSql.error("alter table %s.%s modify column c2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -647,7 +647,7 @@ class TDTestCase: tdSql.query("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName)) tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName)) tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName)) - + tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -662,7 +662,7 @@ class TDTestCase: # alter actions allowed: drop column/tag, modify column/tag type, rename column/tag not included in topic tdSql.query("alter table %s.%s add column c6 float"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s add tag t6 float"%(parameterDict['dbName'], parameterDict['stbName'])) - + # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -679,7 +679,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 3 end ...... ") def tmqCase4(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 4: ") + tdLog.printNoPrefix("======== test case 4: ") parameterDict = {'cfg': '', \ 'actionType': 0, \ 'dbName': 'db4', \ @@ -695,7 +695,7 @@ class TDTestCase: parameterDict['cfg'] = cfgPath ctbName = 'stb4_0' - + tdLog.info("create database, super table, child table, normal table") self.create_database(tdSql, parameterDict["dbName"]) tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"])) @@ -703,7 +703,7 @@ class TDTestCase: tdLog.info("create topics from super table") columnTopicFromStb = 'star_topic_from_stb4' - + tdSql.execute("create topic %s as stable %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdLog.info("======== child table test:") @@ -739,10 +739,10 @@ class TDTestCase: tdSql.query("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t1new"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t3new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t4new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -750,7 +750,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 4 end ...... ") def tmqCase5(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 5: ") + tdLog.printNoPrefix("======== test case 5: ") parameterDict = {'cfg': '', \ 'actionType': 0, \ 'dbName': 'db5', \ @@ -766,7 +766,7 @@ class TDTestCase: parameterDict['cfg'] = cfgPath ctbName = 'stb5_0' - + tdLog.info("create database, super table, child table, normal table") self.create_database(tdSql, parameterDict["dbName"]) tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"])) @@ -774,7 +774,7 @@ class TDTestCase: tdLog.info("create topics from super table") columnTopicFromStb = 'star_topic_from_db5' - + tdSql.execute("create topic %s as database %s" %(columnTopicFromStb, parameterDict['dbName'])) tdLog.info("======== child table test:") @@ -810,10 +810,10 @@ class TDTestCase: tdSql.query("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t1new"%(parameterDict['dbName'], parameterDict['stbName'])) - tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName'])) + tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t3new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t4new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -821,7 +821,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 5 end ...... ") def tmqCase6(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 6: ") + tdLog.printNoPrefix("======== test case 6: ") parameterDict = {'cfg': '', \ 'actionType': 0, \ 'dbName': 'db6', \ @@ -835,18 +835,18 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + tdLog.info("create database, super table, child table, normal table") self.create_database(tdSql, parameterDict["dbName"]) tdLog.info("======== child table test:") parameterDict['stbName'] = 'stb6' ctbName = 'stb6_0' - tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) + tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName'])) tdSql.query("create table %s.%s using %s.%s tags (10, '10', 10, '10', '10')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName'])) tdLog.info("create topics from child table") columnTopicFromCtb = 'column_topic_from_ctb6' - + tdSql.execute("create topic %s as select c1, c2, c3 from %s.%s where t1 > 10 and t2 = 'beijign' and sin(t3) < 0" %(columnTopicFromCtb,parameterDict['dbName'],ctbName)) tdSql.error("alter table %s.%s modify column c1 binary(40)"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -861,7 +861,7 @@ class TDTestCase: tdSql.error("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName)) tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName)) tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName)) - + tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -876,7 +876,7 @@ class TDTestCase: # alter actions allowed: drop column/tag, modify column/tag type, rename column/tag not included in topic tdSql.query("alter table %s.%s add column c6 float"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.query("alter table %s.%s add tag t6 float"%(parameterDict['dbName'], parameterDict['stbName'])) - + # alter actions prohibited: drop column/tag, modify column/tag type, rename column/tag included in topic tdSql.error("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName'])) tdSql.error("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName'])) @@ -903,7 +903,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath) diff --git a/tests/system-test/7-tmq/stbFilter.py b/tests/system-test/7-tmq/stbFilter.py index 7ad3cc99e78c56803f76f864fa718a18f32547f0..4942a39db458f0e307649903e7449295d8909cc3 100644 --- a/tests/system-test/7-tmq/stbFilter.py +++ b/tests/system-test/7-tmq/stbFilter.py @@ -79,27 +79,27 @@ class TDTestCase: topicNameList = ['topic1', 'topic2', 'topic3'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) queryString = "select ts, log(c1), cos(c1) from %s.%s where c1 > 5000" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) queryString = "select ts, log(c1), atan(c1) from %s.%s where ts >= %d" %(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9000) sqlString = "create topic %s as %s" %(topicNameList[2], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -115,10 +115,10 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") @@ -132,7 +132,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -148,14 +148,14 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[2] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) tdLog.exit("2 tmq consume rows error!") - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -193,7 +193,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where sin(c2) >= 0" %(paraDict['dbName'], paraDict['stbName']) @@ -209,7 +209,7 @@ class TDTestCase: tdSql.execute(sqlString) tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) - + # start tmq consume processor tdLog.info("insert consume info to consume processor") consumerId = 0 @@ -223,10 +223,10 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") @@ -240,7 +240,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -256,14 +256,14 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[2] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) tdLog.exit("2 tmq consume rows error!") - # time.sleep(10) + # time.sleep(10) # for i in range(len(topicNameList)): # tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/stbTagFilter-1ctb.py b/tests/system-test/7-tmq/stbTagFilter-1ctb.py index 003dd9a47d5698efafda6a112868d78854e8a6ef..6a26d2ce1f38774b2d63031c518883641c23f864 100644 --- a/tests/system-test/7-tmq/stbTagFilter-1ctb.py +++ b/tests/system-test/7-tmq/stbTagFilter-1ctb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -65,11 +65,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', @@ -95,7 +95,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", @@ -103,16 +103,16 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_UpperCase_stb1' + topicFromStb1 = 'topic_UpperCase_stb1' # queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 @@ -139,18 +139,18 @@ class TDTestCase: tdLog.info("run select sql from db") tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery)) if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") - - tmqCom.checkFileContent(consumerId, queryString) + + tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -170,15 +170,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables # paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -187,17 +187,17 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_UpperCase_stb1' + topicFromStb1 = 'topic_UpperCase_stb1' queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 1 @@ -213,13 +213,13 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -229,7 +229,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) if self.snapshot == 0: if totalConsumeRows != expectrowcnt: @@ -237,8 +237,8 @@ class TDTestCase: elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -251,14 +251,14 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() self.tmqCase2() - + def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/stbTagFilter-multiCtb.py b/tests/system-test/7-tmq/stbTagFilter-multiCtb.py index 1ea23fe376aa928c892c4bd49b85eddc83f7158d..9053bf26206f73950018f41e05e247fe9e36c181 100644 --- a/tests/system-test/7-tmq/stbTagFilter-multiCtb.py +++ b/tests/system-test/7-tmq/stbTagFilter-multiCtb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -65,11 +65,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', @@ -95,7 +95,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # update to half tables # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", @@ -103,16 +103,16 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_UpperCase_stb1' + topicFromStb1 = 'topic_UpperCase_stb1' queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 @@ -139,18 +139,18 @@ class TDTestCase: tdLog.info("run select sql from db") tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQuery)) if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -170,15 +170,15 @@ class TDTestCase: 'showMsg': 1, 'showRow': 1, 'snapshot': 0} - + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) - + # update to half tables # paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) # paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) @@ -187,17 +187,17 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_UpperCase_stb1' + topicFromStb1 = 'topic_UpperCase_stb1' # queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + # paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 1 @@ -213,7 +213,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 7/10) paraDict['ctbStartIdx'] = int(paraDict['ctbNum'] * 7/10) # paraDict["rowsPerTbl"] = 100 @@ -221,7 +221,7 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -231,7 +231,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsFromQuery = tdSql.getRows() - + tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt)) if self.snapshot == 0: if totalConsumeRows != expectrowcnt / 2: @@ -239,8 +239,8 @@ class TDTestCase: elif self.snapshot == 1: if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error when snapshot is 1!") - - # tmqCom.checkFileContent(consumerId, queryString) + + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) @@ -253,13 +253,13 @@ class TDTestCase: tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() self.tmqCase2() - + self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() - self.tmqCase2() + self.tmqCase2() def stop(self): tdSql.close()