提交 a3f5386e 编写于 作者: A Alex Duan

test: add consumer for topic

上级 064cf2f3
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# #
import taos import taos
from taos.tmq import Consumer
from util.log import * from util.log import *
from util.cases import * from util.cases import *
...@@ -328,16 +329,17 @@ class TDTestCase: ...@@ -328,16 +329,17 @@ class TDTestCase:
if j % 100 == 0: if j % 100 == 0:
tdSql.execute(f"flush database {self.dbname}") tdSql.execute(f"flush database {self.dbname}")
tdLog.info(" insert row cost time = %ds rows = %d"%(cost, j)) tdLog.info(" insert row cost time = %ds rows = %d"%(cost, j))
self.consume_topic("topic1", 5)
if cost > insertTime and j > 1000: if cost > insertTime and j > 100:
tdLog.info(" insert finished. cost time = %ds rows = %d"%(cost, j)) tdLog.info(" insert finished. cost time = %ds rows = %d"%(cost, j))
return return
# create tmq # create tmq
def create_tmq(self): def create_tmq(self):
sql = f"create topic topic1_{self.dbname} as select ts, col1, concat(col12,t12) from {self.stbname};" sql = f"create topic topic1 as select ts, col1, concat(col12,t12) from {self.stbname};"
tdSql.execute(sql) tdSql.execute(sql)
sql = f"create topic topic2_{self.dbname} as select * from {self.stbname};" sql = f"create topic topic2 as select * from {self.stbname};"
tdSql.execute(sql) tdSql.execute(sql)
#tdLog.info(sql) #tdLog.info(sql)
...@@ -351,23 +353,53 @@ class TDTestCase: ...@@ -351,23 +353,53 @@ class TDTestCase:
for dnode in os.listdir(self.projDir): for dnode in os.listdir(self.projDir):
vnodeDir = self.projDir + f"{dnode}/data/vnode/" vnodeDir = self.projDir + f"{dnode}/data/vnode/"
print(f"vnodeDir={vnodeDir}") print(f"vnodeDir={vnodeDir}")
if dnode == "psim": if os.path.isdir(vnodeDir) == False or dnode[:5] != "dnode":
continue continue
# enum all vnode # enum all vnode
for entry in os.listdir(vnodeDir): for entry in os.listdir(vnodeDir):
entryPath = path.join(vnodeDir, entry) entryPath = path.join(vnodeDir, entry)
if os.path.isdir(entryPath): if os.path.isdir(entryPath):
if path.exists(path.join(entryPath, "vnode.json")): if path.exists(path.join(entryPath, "vnode.json")):
try: vnode = VNode(int(dnode[5:]), entryPath, self.wal_period, self.wal_size)
vnode = VNode(i, entryPath, self.wal_period, self.wal_size) vnodes.append(vnode)
vnodes.append(vnode)
except:
continue
# do check # do check
for vnode in vnodes: for vnode in vnodes:
vnode.check_retention() vnode.check_retention()
# consume topic
def consume_topic(self, topic_name, consume_cnt):
print("start consume...")
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print("start subscrite...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
# test db1 # test db1
def test_db(self, dbname, checkTime ,wal_period, wal_size_kb): def test_db(self, dbname, checkTime ,wal_period, wal_size_kb):
...@@ -375,7 +407,7 @@ class TDTestCase: ...@@ -375,7 +407,7 @@ class TDTestCase:
stable = "meters" stable = "meters"
tbname = "d" tbname = "d"
vgroups = 6 vgroups = 6
count = 20 count = 10
# do # do
self.create_database(dbname, wal_period, wal_size_kb, vgroups) self.create_database(dbname, wal_period, wal_size_kb, vgroups)
...@@ -400,6 +432,7 @@ class TDTestCase: ...@@ -400,6 +432,7 @@ class TDTestCase:
tdLog.info(f" {dbname} stop insert ...") tdLog.info(f" {dbname} stop insert ...")
tdLog.info(f" {dbname} test_db end.") tdLog.info(f" {dbname} test_db end.")
# run # run
def run(self): def run(self):
# period # period
...@@ -408,7 +441,8 @@ class TDTestCase: ...@@ -408,7 +441,8 @@ class TDTestCase:
#self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size #self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size
# period + size # period + size
self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10) #self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10)
self.test_db("db", checkTime = 1*60, wal_period = 0, wal_size_kb=0)
def stop(self): def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册