提交 7eff0410 编写于 作者: J jiajingbin

test: save

上级 da631dcb
...@@ -21,11 +21,12 @@ class TDTestCase: ...@@ -21,11 +21,12 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.commit_value_list = ["true", "false"] self.commit_value_list = ["true", "false"]
self.reset_value_list = ["", "earliest", "latest", "none"] self.offset_value_list = ["", "earliest", "latest", "none"]
self.offset_value_list = ["", "earliest", "none"]
self.tbname_value_list = ["true", "false"] self.tbname_value_list = ["true", "false"]
self.commit_value_list = ["true"] self.commit_value_list = ["true"]
self.reset_value_list = ["earliest"] self.offset_value_list = ["latest"]
self.tbname_value_list = ["true"] self.tbname_value_list = ["true"]
def tmqParamsTest(self): def tmqParamsTest(self):
...@@ -58,11 +59,12 @@ class TDTestCase: ...@@ -58,11 +59,12 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topic_name, queryString) sqlString = "create topic %s as %s" %(topic_name, queryString)
start_group_id = 1 start_group_id = 1
for commit_value in self.commit_value_list: for commit_value in self.commit_value_list:
for reset_value in self.reset_value_list: for offset_value in self.offset_value_list:
for tbname_value in self.tbname_value_list: for tbname_value in self.tbname_value_list:
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
tdSql.query(queryString) tdSql.query(queryString)
expected_res = tdSql.queryRows
group_id = "csm_" + str(start_group_id) group_id = "csm_" + str(start_group_id)
consumer_dict = { consumer_dict = {
"group.id": group_id, "group.id": group_id,
...@@ -70,18 +72,19 @@ class TDTestCase: ...@@ -70,18 +72,19 @@ class TDTestCase:
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"auto.commit.interval.ms": paraDict["auto_commit_interval"], "auto.commit.interval.ms": paraDict["auto_commit_interval"],
"enable.auto.commit": commit_value, "enable.auto.commit": commit_value,
"auto.offset.reset": reset_value, "auto.offset.reset": offset_value,
"msg.with.table.name": tbname_value "msg.with.table.name": tbname_value
} }
print(consumer_dict)
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if reset_value == "" else reset_value consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
if len(reset_value) == 0: if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"] del consumer_dict["auto.offset.reset"]
print(consumer_dict)
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
consumer.subscribe([topic_name]) consumer.subscribe([topic_name])
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000)))
try: try:
while True: while True:
res = consumer.poll(1) res = consumer.poll(1)
...@@ -92,8 +95,7 @@ class TDTestCase: ...@@ -92,8 +95,7 @@ class TDTestCase:
err = res.error() err = res.error()
if err is not None: if err is not None:
raise err raise err
val = res.value() # val = res.value()
# for block in val: # for block in val:
# print(block.fetchall()) # print(block.fetchall())
finally: finally:
...@@ -104,11 +106,12 @@ class TDTestCase: ...@@ -104,11 +106,12 @@ class TDTestCase:
tdSql.query('show subscriptions;') tdSql.query('show subscriptions;')
subscription_info = tdSql.queryResult subscription_info = tdSql.queryResult
offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
print(offset_value_list) print(offset_value_list)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
print(rows_value_list) print(rows_value_list)
tdSql.checkEqual(sum(rows_value_list), expected_res)
tdSql.execute(f"drop topic if exists {topic_name}") tdSql.execute(f"drop topic if exists {topic_name}")
return
def run(self): def run(self):
self.tmqParamsTest() self.tmqParamsTest()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册