diff --git a/tests/system-test/0-others/walRetention.py b/tests/system-test/0-others/walRetention.py index 2bcd3f61c21128a8af83a38687ce3b5972ccedc7..84b1914e8cad04b23b9dbe616726122a7aefe51f 100644 --- a/tests/system-test/0-others/walRetention.py +++ b/tests/system-test/0-others/walRetention.py @@ -16,6 +16,7 @@ # import taos +from taos.tmq import Consumer from util.log import * from util.cases import * @@ -328,16 +329,17 @@ class TDTestCase: if j % 100 == 0: tdSql.execute(f"flush database {self.dbname}") tdLog.info(" insert row cost time = %ds rows = %d"%(cost, j)) + self.consume_topic("topic1", 5) - if cost > insertTime and j > 1000: + if cost > insertTime and j > 100: tdLog.info(" insert finished. cost time = %ds rows = %d"%(cost, j)) return # create tmq def create_tmq(self): - sql = f"create topic topic1_{self.dbname} as select ts, col1, concat(col12,t12) from {self.stbname};" + sql = f"create topic topic1 as select ts, col1, concat(col12,t12) from {self.stbname};" tdSql.execute(sql) - sql = f"create topic topic2_{self.dbname} as select * from {self.stbname};" + sql = f"create topic topic2 as select * from {self.stbname};" tdSql.execute(sql) #tdLog.info(sql) @@ -351,23 +353,53 @@ class TDTestCase: for dnode in os.listdir(self.projDir): vnodeDir = self.projDir + f"{dnode}/data/vnode/" print(f"vnodeDir={vnodeDir}") - if dnode == "psim": + if os.path.isdir(vnodeDir) == False or dnode[:5] != "dnode": continue # enum all vnode for entry in os.listdir(vnodeDir): entryPath = path.join(vnodeDir, entry) + if os.path.isdir(entryPath): if path.exists(path.join(entryPath, "vnode.json")): - try: - vnode = VNode(i, entryPath, self.wal_period, self.wal_size) - vnodes.append(vnode) - except: - continue + vnode = VNode(int(dnode[5:]), entryPath, self.wal_period, self.wal_size) + vnodes.append(vnode) # do check for vnode in vnodes: vnode.check_retention() + # consume topic + def consume_topic(self, topic_name, consume_cnt): + print("start consume...") + consumer = Consumer( + { + "group.id": "tg2", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + print("start subscrite...") + consumer.subscribe([topic_name]) + + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() + # test db1 def test_db(self, dbname, checkTime ,wal_period, wal_size_kb): @@ -375,7 +407,7 @@ class TDTestCase: stable = "meters" tbname = "d" vgroups = 6 - count = 20 + count = 10 # do self.create_database(dbname, wal_period, wal_size_kb, vgroups) @@ -400,6 +432,7 @@ class TDTestCase: tdLog.info(f" {dbname} stop insert ...") tdLog.info(f" {dbname} test_db end.") + # run def run(self): # period @@ -408,7 +441,8 @@ class TDTestCase: #self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size # period + size - self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10) + #self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10) + self.test_db("db", checkTime = 1*60, wal_period = 0, wal_size_kb=0) def stop(self):