提交 7445a8ab 编写于 作者: J jiajingbin

test: finish testcases for TS-3495

上级 0375654d
import taos
import sys import sys
import time import time
import socket
import os
import threading import threading
import math
from taos.tmq import Consumer from taos.tmq import Consumer
from util.log import * from util.log import *
from util.sql import * from util.sql import *
...@@ -24,18 +20,16 @@ class TDTestCase: ...@@ -24,18 +20,16 @@ class TDTestCase:
self.wal_retention_period1 = 3600 self.wal_retention_period1 = 3600
self.wal_retention_period2 = 1 self.wal_retention_period2 = 1
self.commit_value_list = ["true", "false"] self.commit_value_list = ["true", "false"]
self.commit_value_list = ["true"]
self.offset_value_list = ["", "earliest", "latest", "none"] self.offset_value_list = ["", "earliest", "latest", "none"]
self.tbname_value_list = ["true", "false"] self.tbname_value_list = ["true", "false"]
self.snapshot_value_list = ["true", "false"] self.snapshot_value_list = ["true", "false"]
# self.commit_value_list = ["true"] # self.commit_value_list = ["true"]
# self.offset_value_list = ["latest"] # self.offset_value_list = ["none"]
# self.offset_value_list = ["earliest"]
# self.tbname_value_list = ["true"] # self.tbname_value_list = ["true"]
# self.snapshot_value_list = ["true"]
def tmqParamsTest(self): def tmqParamsTest(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
'dropFlag': 1, 'dropFlag': 1,
'vgroups': 4, 'vgroups': 4,
...@@ -49,7 +43,7 @@ class TDTestCase: ...@@ -49,7 +43,7 @@ class TDTestCase:
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'auto_commit_interval': "100"} 'auto_commit_interval': "100"}
start_group_id = 1 start_group_id = 1
for snapshot_value in self.snapshot_value_list: for snapshot_value in self.snapshot_value_list:
for commit_value in self.commit_value_list: for commit_value in self.commit_value_list:
...@@ -64,15 +58,13 @@ class TDTestCase: ...@@ -64,15 +58,13 @@ class TDTestCase:
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data") tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter") tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topic_name, queryString) sqlString = "create topic %s as %s" %(topic_name, queryString)
print("----", snapshot_value)
tdSql.query(f'select * from information_schema.ins_databases') tdSql.query(f'select * from information_schema.ins_databases')
db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult)) db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult))
print("---db_wal_retention_period_list", db_wal_retention_period_list)
for i in range(len(db_wal_retention_period_list)): for i in range(len(db_wal_retention_period_list)):
if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None: if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None:
db_wal_retention_period_list.remove(None) db_wal_retention_period_list.remove(None)
...@@ -82,7 +74,6 @@ class TDTestCase: ...@@ -82,7 +74,6 @@ class TDTestCase:
time.sleep(self.wal_retention_period2+1) time.sleep(self.wal_retention_period2+1)
tdSql.execute(f'flush database {paraDict["dbName"]}') tdSql.execute(f'flush database {paraDict["dbName"]}')
else: else:
print("iinininini")
if db_wal_retention_period_list[0] != self.wal_retention_period1: if db_wal_retention_period_list[0] != self.wal_retention_period1:
tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}") tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}")
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
...@@ -100,35 +91,19 @@ class TDTestCase: ...@@ -100,35 +91,19 @@ class TDTestCase:
"experimental.snapshot.enable": snapshot_value, "experimental.snapshot.enable": snapshot_value,
"msg.with.table.name": tbname_value "msg.with.table.name": tbname_value
} }
print(consumer_dict)
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
# consumer_snapshot = 1 if consumer_dict["experimental.snapshot.enable"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
if len(offset_value) == 0: if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"] del consumer_dict["auto.offset.reset"]
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
consumer.subscribe([topic_name]) consumer.subscribe([topic_name])
tdLog.info(f"enable.auto.commit: {commit_value}, auto.offset.reset: {offset_value}, experimental.snapshot.enable: {snapshot_value}, msg.with.table.name: {tbname_value}")
stop_flag = 0 stop_flag = 0
# try:
# while True:
# res = consumer.poll(1)
# tdSql.query('show consumers;')
# consumer_info = tdSql.queryResult[0][-1]
# if not res:
# break
# # err = res.error()
# # if err is not None:
# # raise err
# # val = res.value()
# # for block in val:
# # print(block.fetchall())
try: try:
while True: while True:
res = consumer.poll(1) res = consumer.poll(1)
tdSql.query('show consumers;') tdSql.query('show consumers;')
consumer_info = tdSql.queryResult[0][-1] consumer_info = tdSql.queryResult[0][-1]
if offset_value == "latest": if offset_value == "latest":
...@@ -153,42 +128,40 @@ class TDTestCase: ...@@ -153,42 +128,40 @@ class TDTestCase:
start_group_id += 1 start_group_id += 1
tdSql.query('show subscriptions;') tdSql.query('show subscriptions;')
subscription_info = tdSql.queryResult subscription_info = tdSql.queryResult
print(subscription_info)
if snapshot_value == "true": if snapshot_value == "true":
if offset_value != "earliest": if offset_value != "earliest" and offset_value != "":
pass if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
elif offset_value == "none":
offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
else: else:
if offset_value != "none": if offset_value != "none":
offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info)))
print(offset_value_str) tdSql.checkEqual("tsdb" in offset_value_str, True)
tdSql.checkEqual("snapshot" in offset_value_str, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) tdSql.checkEqual(offset_value_list, [None]*len(subscription_info))
print(offset_value_list)
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else: else:
print("====offset_value----", offset_value)
if offset_value != "none": if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True) tdSql.checkEqual(sum(offset_value_list) > 0, True)
print(offset_value_list)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
print(offset_value_list)
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
# tdSql.checkEqual(sum(rows_value_list), expected_res)
# if offset_value == "latest":
# tdSql.checkEqual(sum(rows_value_list), expected_res)
# else:
# tdSql.checkEqual(sum(rows_value_list), expected_res)
tdSql.execute(f"drop topic if exists {topic_name}") tdSql.execute(f"drop topic if exists {topic_name}")
tdSql.execute(f'drop database if exists {paraDict["dbName"]}') tdSql.execute(f'drop database if exists {paraDict["dbName"]}')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册