diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb-funcNFilter.py index d22d183e863d26a0c0498e37861b2f526fe4262d..20b0c65c714cda7709ba80d6289c628d0de2922e 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb-funcNFilter.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 1 self.rowsPerTbl = 100000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -94,18 +94,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -120,18 +120,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - tmqCom.checkFileContent(consumerId, queryString) + tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -162,11 +162,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -174,7 +174,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -190,60 +190,60 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) firstConsumeRows = resultList[0] - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) - tdLog.exit("%d tmq consume rows error!"%consumerId) + tdLog.exit("%d tmq consume rows error!"%consumerId) # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 2 expectrowcnt = math.ceil(totalRowsInserted/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) + resultList = tmqCom.selectConsumeResult(expectRows) secondConsumeRows = resultList[0] - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 3 expectrowcnt = math.ceil(totalRowsInserted/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) + resultList = tmqCom.selectConsumeResult(expectRows) thirdConsumeRows = resultList[0] - + if not (totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - - # total consume + + # total consume actConsumeTotalRows = firstConsumeRows + secondConsumeRows + thirdConsumeRows - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb.py index 951a747069a2ad279dec56fd64781609ec2aa0f4..494952ecd5538b47d2bf59167cb6f4d934049f13 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-1ctb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 1 self.rowsPerTbl = 100000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -94,18 +94,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -120,18 +120,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - tmqCom.checkFileContent(consumerId, queryString) + tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -162,18 +162,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -189,60 +189,60 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) firstConsumeRows = resultList[0] - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) - tdLog.exit("%d tmq consume rows error!"%consumerId) + tdLog.exit("%d tmq consume rows error!"%consumerId) # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 1/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) + resultList = tmqCom.selectConsumeResult(expectRows) secondConsumeRows = resultList[0] - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 3 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 1/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) + resultList = tmqCom.selectConsumeResult(expectRows) thirdConsumeRows = resultList[0] - + if not (totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - - # total consume + + # total consume actConsumeTotalRows = firstConsumeRows + secondConsumeRows + thirdConsumeRows - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py index 6ee089af4e1fd53d4de7490db94f9847cb1d7aee..666ca6ca646937fb29ea9ec6346ebd97e5351dae 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 3000 self.rowsPerTbl = 150 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.starttaosd(1) @@ -94,11 +94,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5)) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -106,7 +106,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -121,18 +121,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - # tmqCom.checkFileContent(consumerId, queryString) + # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -163,11 +163,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5)) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -175,7 +175,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] tdLog.info("select result rows: %d"%totalRowsInserted) @@ -192,15 +192,15 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + firstConsumeRows = resultList[0] # reinit consume info, and start tmq_sim, then check consume result @@ -208,22 +208,22 @@ class TDTestCase: consumerId = 2 expectrowcnt = math.ceil(totalRowsInserted*2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = firstConsumeRows + resultList[0] - + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): - tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py index 451dc43343409e7feeb6db5705cdf4a4151caa6d..05f7030169490c1e9bf2dbe321efee4cbe0bd7d0 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 4000 self.rowsPerTbl = 150 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,11 +93,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5)) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -105,7 +105,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] tdLog.info("select result rows: %d"%totalRowsInserted) @@ -126,12 +126,12 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) @@ -167,11 +167,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 != 0) and (ts+1a >= %d) and (t4 like '%%shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/10)) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -179,7 +179,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] tdLog.info("select result rows: %d"%totalRowsInserted) @@ -196,7 +196,7 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() # tdLog.info("wait start consume notify") @@ -204,29 +204,29 @@ class TDTestCase: tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py index 3b3467f9f99807262864940deefe6e96c95a8af4..232d90848fc08c74c46308147942c90e85e7d509 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 3000 self.rowsPerTbl = 70 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -124,17 +124,17 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -165,18 +165,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -192,35 +192,35 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py index d1fe69f0b7acbcc374df5c08b3498c75083f0f70..5841c6d60534eef7a98c5fd6aa0abe629fc4ebb7 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 10 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -124,17 +124,17 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -165,18 +165,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -192,35 +192,35 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 597f7968ae26d17590e0b611c07539ded2990b9b..499f837ccc806240889fc867a7e31ddff1551e96 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 10 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -124,17 +124,17 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -165,18 +165,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -192,35 +192,35 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i])