提交 74b1ad97 编写于 作者: P plum-lihui

test: add test case for tmq

上级 dbb8486d
class DataBoundary:
def __init__(self):
self.TINYINT_BOUNDARY = [-128, 127]
self.SMALLINT_BOUNDARY = [-32768, 32767]
self.INT_BOUNDARY = [-2147483648, 2147483647]
self.BIGINT_BOUNDARY = [-9223372036854775808, 9223372036854775807]
self.UTINYINT_BOUNDARY = [0, 255]
self.USMALLINT_BOUNDARY = [0, 65535]
self.UINT_BOUNDARY = [0, 4294967295]
self.UBIGINT_BOUNDARY = [0, 18446744073709551615]
self.FLOAT_BOUNDARY = [-3.40E+38, 3.40E+38]
self.DOUBLE_BOUNDARY = [-1.7e+308, 1.7e+308]
self.BOOL_BOUNDARY = [True, False]
self.BINARY_MAX_LENGTH = 16374
self.NCHAR_MAX_LENGTH = 4093
self.DBNAME_MAX_LENGTH = 64
self.STBNAME_MAX_LENGTH = 192
self.TBNAME_MAX_LENGTH = 192
self.CHILD_TBNAME_MAX_LENGTH = 192
self.TAG_KEY_MAX_LENGTH = 64
self.COL_KEY_MAX_LENGTH = 64
self.MAX_TAG_COUNT = 128
self.MAX_TAG_COL_COUNT = 4096
self.mnodeShmSize = [6292480, 2147483647]
self.mnodeShmSize_default = 6292480
self.vnodeShmSize = [6292480, 2147483647]
self.vnodeShmSize_default = 31458304
self.DB_PARAM_BUFFER_CONFIG = {"create_name": "buffer", "query_name": "buffer", "vnode_json_key": "szBuf", "boundary": [3, 16384], "default": 96}
self.DB_PARAM_CACHELAST_CONFIG = {"create_name": "cachelast", "query_name": "cache_model", "vnode_json_key": "", "boundary": [0, 1, 2, 3], "default": 0}
self.DB_PARAM_COMP_CONFIG = {"create_name": "comp", "query_name": "compression", "vnode_json_key": "", "boundary": [0, 1, 2], "default": 2}
self.DB_PARAM_DURATION_CONFIG = {"create_name": "duration", "query_name": "duration", "vnode_json_key": "daysPerFile", "boundary": [1, 3650, '60m', '5256000m', '1h', '87600h', '1d', '3650d'], "default": "14400m"}
self.DB_PARAM_FSYNC_CONFIG = {"create_name": "fsync", "query_name": "fsync", "vnode_json_key": "", "boundary": [0, 180000], "default": 3000}
self.DB_PARAM_KEEP_CONFIG = {"create_name": "keep", "query_name": "fsync", "vnode_json_key": "", "boundary": [1, 365000,'1440m','525600000m','24h','8760000h','1d','365000d'], "default": "5256000m,5256000m,5256000m"}
self.DB_PARAM_MAXROWS_CONFIG = {"create_name": "maxrows", "query_name": "maxrows", "vnode_json_key": "maxRows", "boundary": [200, 10000], "default": 4096}
self.DB_PARAM_MINROWS_CONFIG = {"create_name": "minrows", "query_name": "minrows", "vnode_json_key": "minRows", "boundary": [10, 1000], "default": 100}
self.DB_PARAM_NTABLES_CONFIG = {"create_name": "ntables", "query_name": "ntables", "vnode_json_key": "", "boundary": 0, "default": 0}
self.DB_PARAM_PAGES_CONFIG = {"create_name": "pages", "query_name": "pages", "vnode_json_key": "szCache", "boundary": [64], "default": 256}
self.DB_PARAM_PAGESIZE_CONFIG = {"create_name": "pagesize", "query_name": "pagesize", "vnode_json_key": "szPage", "boundary": [1, 16384], "default": 4}
self.DB_PARAM_PRECISION_CONFIG = {"create_name": "precision", "query_name": "precision", "vnode_json_key": "", "boundary": ['ms', 'us', 'ns'], "default": "ms"}
self.DB_PARAM_REPLICA_CONFIG = {"create_name": "replica", "query_name": "replica", "vnode_json_key": "", "boundary": [1], "default": 1}
self.DB_PARAM_SINGLE_STABLE_CONFIG = {"create_name": "single_stable", "query_name": "single_stable_model", "vnode_json_key": "", "boundary": [0, 1], "default": 0}
self.DB_PARAM_STRICT_CONFIG = {"create_name": "strict", "query_name": "strict", "vnode_json_key": "", "boundary": {"no_strict": 0, "strict": 1}, "default": "no_strict"}
self.DB_PARAM_VGROUPS_CONFIG = {"create_name": "vgroups", "query_name": "vgroups", "vnode_json_key": "", "boundary": [1, 32], "default": 2}
self.DB_PARAM_WAL_CONFIG = {"create_name": "wal", "query_name": "wal", "vnode_json_key": "", "boundary": [1, 2], "default": 1}
\ No newline at end of file
......@@ -17,7 +17,7 @@ import string
import requests
import time
import socket
from .boundary import DataBoundary
import taos
from util.log import *
from util.sql import *
......@@ -25,9 +25,45 @@ from util.cases import *
from util.dnodes import *
from util.common import *
class TDCom:
def init(self, conn, logSql):
tdSql.init(conn.cursor(), logSql)
class TDCom:
def __init__(self):
self.sml_type = None
self.env_setting = None
self.smlChildTableName_value = None
self.defaultJSONStrType_value = None
self.smlTagNullName_value = None
self.default_varchar_length = 256
self.default_nchar_length = 256
self.default_varchar_datatype = "letters"
self.default_nchar_datatype = "letters"
self.default_tagname_prefix = "t"
self.default_colname_prefix = "c"
self.default_stbname_prefix = "stb"
self.default_ctbname_prefix = "ctb"
self.default_tbname_prefix = "tb"
self.default_tag_index_start_num = 1
self.default_column_index_start_num = 1
self.default_stbname_index_start_num = 1
self.default_ctbname_index_start_num = 1
self.default_tbname_index_start_num = 1
self.default_tagts_name = "ts"
self.default_colts_name = "ts"
self.dbname = "test"
self.stb_name = "stb"
self.ctb_name = "ctb"
self.tb_name = "tb"
self.need_tagts = False
self.tag_type_str = ""
self.column_type_str = ""
self.columns_str = None
self.ts_value = None
self.tag_value_list = list()
self.column_value_list = list()
self.full_type_list = ["tinyint", "smallint", "int", "bigint", "tinyint unsigned", "smallint unsigned", "int unsigned", "bigint unsigned", "float", "double", "binary", "nchar", "bool"]
self.white_list = ["statsd", "node_exporter", "collectd", "icinga2", "tcollector", "information_schema", "performance_schema"]
self.Boundary = DataBoundary()
# def init(self, conn, logSql):
# # tdSql.init(conn.cursor(), logSql)
def preDefine(self):
header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
......@@ -113,6 +149,59 @@ class TDCom:
def dateToTs(self, datetime_input):
return int(time.mktime(time.strptime(datetime_input, "%Y-%m-%d %H:%M:%S.%f")))
def genTs(self, precision="ms", ts="", protype="taosc", ns_tag=None):
"""
protype = "taosc" or "restful"
gen ts and datetime
"""
if precision == "ns":
if ts == "" or ts is None:
ts = time.time_ns()
else:
ts = ts
if ns_tag is None:
dt = ts
else:
dt = datetime.fromtimestamp(ts // 1000000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000000)).zfill(9)
if protype == "restful":
dt = datetime.fromtimestamp(ts // 1000000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000000)).zfill(9)
else:
if ts == "" or ts is None:
ts = time.time()
else:
ts = ts
if precision == "ms" or precision is None:
ts = int(round(ts * 1000))
dt = datetime.fromtimestamp(ts // 1000)
if protype == "taosc":
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000)).zfill(3) + '000'
elif protype == "restful":
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000)).zfill(3)
else:
pass
elif precision == "us":
ts = int(round(ts * 1000000))
dt = datetime.fromtimestamp(ts // 1000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000)).zfill(6)
return ts, dt
def get_long_name(self, length=10, mode="letters"):
"""
generate long name
mode could be numbers/letters/letters_mixed/mixed
"""
if mode == "numbers":
population = string.digits
elif mode == "letters":
population = string.ascii_letters.lower()
elif mode == "letters_mixed":
population = string.ascii_letters.upper() + string.ascii_letters.lower()
else:
population = string.ascii_letters.lower() + string.digits
return "".join(random.choices(population, k=length))
def getLongName(self, len, mode = "mixed"):
"""
generate long name
......@@ -177,84 +266,92 @@ class TDCom:
def close(self):
self.cursor.close()
def create_database(self,tsql, dbName='test',dropFlag=1,precision="ms", **kwargs):
########################################################################################################################################
# new common API
########################################################################################################################################
def create_database(self,tsql, dbName='test',dropFlag=1,**kwargs):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
'''
vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions
'''
sqlString = f'create database if not exists {dbName} precision "{precision}" vgroups 4'
sqlString = f'create database if not exists {dbName} '
dbParams = ""
if len(kwargs) > 0:
dbParams = ""
for param, value in kwargs.items():
dbParams += f'{param} {value} '
if param == "precision":
dbParams += f'{param} "{value}" '
else:
dbParams += f'{param} {value} '
sqlString += f'{dbParams}'
tdLog.debug("create db sql: %s"%sqlString)
tsql.execute(sqlString)
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, dbName,stbName,columnDict,tagDict):
colSchema = ''
for i in range(columnDict['int']):
colSchema += ', c%d int'%i
tagSchema = ''
for i in range(tagDict['int']):
if i > 0:
tagSchema += ','
tagSchema += 't%d int'%i
tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict):
tsql.execute("use %s" %dbName)
tagsValues = ''
for i in range(tagDict['int']):
if i > 0:
tagsValues += ','
tagsValues += '%d'%i
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
# def create_stable(self,tsql, dbName,stbName,column_elm_list=None, tag_elm_list=None):
# colSchema = ''
# for i in range(columnDict['int']):
# colSchema += ', c%d int'%i
# tagSchema = ''
# for i in range(tagDict['int']):
# if i > 0:
# tagSchema += ','
# tagSchema += 't%d int'%i
# tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema))
# tdLog.debug("complete to create %s.%s" %(dbName, stbName))
# return
# def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict):
# tsql.execute("use %s" %dbName)
# tagsValues = ''
# for i in range(tagDict['int']):
# if i > 0:
# tagsValues += ','
# tagsValues += '%d'%i
# pre_create = "create table"
# sql = pre_create
# #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# for i in range(ctbNum):
# sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues)
# if (i > 0) and (i%100 == 0):
# tsql.execute(sql)
# sql = pre_create
# if sql != pre_create:
# tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, %d)"%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
# tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
# return
# def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
# tdLog.debug("start to insert data ............")
# tsql.execute("use %s" %dbName)
# pre_insert = "insert into "
# sql = pre_insert
# if startTs == 0:
# t = time.time()
# startTs = int(round(t * 1000))
# #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
# for i in range(ctbNum):
# sql += " %s_%d values "%(stbName,i)
# for j in range(rowsPerTbl):
# sql += "(%d, %d, %d)"%(startTs + j, j, j)
# if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
# tsql.execute(sql)
# if j < rowsPerTbl - 1:
# sql = "insert into %s_%d values " %(stbName,i)
# else:
# sql = "insert into "
# #end sql
# if sql != pre_insert:
# #print("insert sql:%s"%sql)
# tsql.execute(sql)
# tdLog.debug("insert data ............ [OK]")
# return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......@@ -295,4 +392,250 @@ class TDCom:
newTdSql.init(cur, False)
return newTdSql
################################################################################################################
# port from the common.py of new test frame
################################################################################################################
def gen_default_tag_str(self):
default_tag_str = ""
for tag_type in self.full_type_list:
if tag_type.lower() not in ["varchar", "binary", "nchar"]:
default_tag_str += f" {self.default_tagname_prefix}{self.default_tag_index_start_num} {tag_type},"
else:
if tag_type.lower() in ["varchar", "binary"]:
default_tag_str += f" {self.default_tagname_prefix}{self.default_tag_index_start_num} {tag_type}({self.default_varchar_length}),"
else:
default_tag_str += f" {self.default_tagname_prefix}{self.default_tag_index_start_num} {tag_type}({self.default_nchar_length}),"
self.default_tag_index_start_num += 1
if self.need_tagts:
default_tag_str = self.default_tagts_name + " timestamp," + default_tag_str
return default_tag_str[:-1].lstrip()
def gen_default_column_str(self):
self.default_column_index_start_num = 1
default_column_str = ""
for col_type in self.full_type_list:
if col_type.lower() not in ["varchar", "binary", "nchar"]:
default_column_str += f" {self.default_colname_prefix}{self.default_column_index_start_num} {col_type},"
else:
if col_type.lower() in ["varchar", "binary"]:
default_column_str += f" {self.default_colname_prefix}{self.default_column_index_start_num} {col_type}({self.default_varchar_length}),"
else:
default_column_str += f" {self.default_colname_prefix}{self.default_column_index_start_num} {col_type}({self.default_nchar_length}),"
self.default_column_index_start_num += 1
default_column_str = self.default_colts_name + " timestamp," + default_column_str
return default_column_str[:-1].lstrip()
def gen_tag_type_str(self, tagname_prefix, tag_elm_list):
tag_index_start_num = 1
tag_type_str = ""
if tag_elm_list is None:
tag_type_str = self.gen_default_tag_str()
else:
for tag_elm in tag_elm_list:
if "count" in tag_elm:
total_count = int(tag_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
tag_type_str += f'{tagname_prefix}{tag_index_start_num} {tag_elm["type"]}, '
if tag_elm["type"] in ["varchar", "binary", "nchar"]:
tag_type_str = tag_type_str.rstrip()[:-1] + f'({tag_elm["len"]}), '
tag_index_start_num += 1
else:
continue
tag_type_str = tag_type_str.rstrip()[:-1]
return tag_type_str
def gen_column_type_str(self, colname_prefix, column_elm_list):
column_index_start_num = 1
column_type_str = ""
if column_elm_list is None:
column_type_str = self.gen_default_column_str()
else:
for column_elm in column_elm_list:
if "count" in column_elm:
total_count = int(column_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
column_type_str += f'{colname_prefix}{column_index_start_num} {column_elm["type"]}, '
if column_elm["type"] in ["varchar", "binary", "nchar"]:
column_type_str = column_type_str.rstrip()[:-1] + f'({column_elm["len"]}), '
column_index_start_num += 1
else:
continue
column_type_str = self.default_colts_name + " timestamp, " + column_type_str.rstrip()[:-1]
return column_type_str
def gen_random_type_value(self, type_name, binary_length, binary_type, nchar_length, nchar_type):
if type_name.lower() == "tinyint":
return random.randint(self.Boundary.TINYINT_BOUNDARY[0], self.Boundary.TINYINT_BOUNDARY[1])
elif type_name.lower() == "smallint":
return random.randint(self.Boundary.SMALLINT_BOUNDARY[0], self.Boundary.SMALLINT_BOUNDARY[1])
elif type_name.lower() == "int":
return random.randint(self.Boundary.INT_BOUNDARY[0], self.Boundary.INT_BOUNDARY[1])
elif type_name.lower() == "bigint":
return random.randint(self.Boundary.BIGINT_BOUNDARY[0], self.Boundary.BIGINT_BOUNDARY[1])
elif type_name.lower() == "tinyint unsigned":
return random.randint(self.Boundary.UTINYINT_BOUNDARY[0], self.Boundary.UTINYINT_BOUNDARY[1])
elif type_name.lower() == "smallint unsigned":
return random.randint(self.Boundary.USMALLINT_BOUNDARY[0], self.Boundary.USMALLINT_BOUNDARY[1])
elif type_name.lower() == "int unsigned":
return random.randint(self.Boundary.UINT_BOUNDARY[0], self.Boundary.UINT_BOUNDARY[1])
elif type_name.lower() == "bigint unsigned":
return random.randint(self.Boundary.UBIGINT_BOUNDARY[0], self.Boundary.UBIGINT_BOUNDARY[1])
elif type_name.lower() == "float":
return random.uniform(self.Boundary.FLOAT_BOUNDARY[0], self.Boundary.FLOAT_BOUNDARY[1])
elif type_name.lower() == "double":
return random.uniform(self.Boundary.FLOAT_BOUNDARY[0], self.Boundary.FLOAT_BOUNDARY[1])
elif type_name.lower() == "binary":
return f'{self.get_long_name(binary_length, binary_type)}'
elif type_name.lower() == "varchar":
return self.get_long_name(binary_length, binary_type)
elif type_name.lower() == "nchar":
return self.get_long_name(nchar_length, nchar_type)
elif type_name.lower() == "bool":
return random.choice(self.Boundary.BOOL_BOUNDARY)
elif type_name.lower() == "timestamp":
return self.genTs()[0]
else:
pass
def gen_tag_value_list(self, tag_elm_list):
tag_value_list = list()
if tag_elm_list is None:
tag_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list))
else:
for tag_elm in tag_elm_list:
if "count" in tag_elm:
total_count = int(tag_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
if tag_elm["type"] in ["varchar", "binary", "nchar"]:
tag_value_list.append(self.gen_random_type_value(tag_elm["type"], tag_elm["len"], self.default_varchar_datatype, tag_elm["len"], self.default_nchar_datatype))
else:
tag_value_list.append(self.gen_random_type_value(tag_elm["type"], "", "", "", ""))
else:
continue
return tag_value_list
def gen_column_value_list(self, column_elm_list, ts_value=None):
if ts_value is None:
ts_value = self.genTs()[0]
column_value_list = list()
if column_elm_list is None:
column_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list))
else:
for column_elm in column_elm_list:
if "count" in column_elm:
total_count = int(column_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
if column_elm["type"] in ["varchar", "binary", "nchar"]:
column_value_list.append(self.gen_random_type_value(column_elm["type"], column_elm["len"], self.default_varchar_datatype, column_elm["len"], self.default_nchar_datatype))
else:
column_value_list.append(self.gen_random_type_value(column_elm["type"], "", "", "", ""))
else:
continue
column_value_list = [self.ts_value] + self.column_value_list
return column_value_list
def create_stable(self, tsql, dbname=None, stbname="stb", column_elm_list=None, tag_elm_list=None,
count=1, default_stbname_prefix="stb", **kwargs):
colname_prefix = 'c'
tagname_prefix = 't'
stbname_index_start_num = 1
stb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
stb_params += f'{param} "{value}" '
column_type_str = self.gen_column_type_str(colname_prefix, column_elm_list)
tag_type_str = self.gen_tag_type_str(tagname_prefix, tag_elm_list)
if int(count) <= 1:
create_stable_sql = f'create table {dbname}.{stbname} ({column_type_str}) tags ({tag_type_str}) {stb_params};'
tdLog.info("create stb sql: %s"%create_stable_sql)
tsql.execute(create_stable_sql)
else:
for _ in range(count):
create_stable_sql = f'create table {dbname}.{default_stbname_prefix}{stbname_index_start_num} ({column_type_str}) tags ({tag_type_str}) {stb_params};'
stbname_index_start_num += 1
tsql.execute(create_stable_sql)
def create_ctable(self, tsql, dbname=None, stbname=None, tag_elm_list=None, count=1, default_ctbname_prefix="ctb", **kwargs):
ctbname_index_start_num = 0
ctb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
ctb_params += f'{param} "{value}" '
tag_value_list = self.gen_tag_value_list(tag_elm_list)
tag_value_str = ""
# tag_value_str = ", ".join(str(v) for v in self.tag_value_list)
for tag_value in tag_value_list:
if isinstance(tag_value, str):
tag_value_str += f'"{tag_value}", '
else:
tag_value_str += f'{tag_value}, '
tag_value_str = tag_value_str.rstrip()[:-1]
if int(count) <= 1:
create_ctable_sql = f'create table {dbname}.{default_ctbname_prefix}{ctbname_index_start_num} using {dbname}.{stbname} tags ({tag_value_str}) {ctb_params};'
tsql.execute(create_ctable_sql)
else:
for _ in range(count):
create_ctable_sql = f'create table {dbname}.{default_ctbname_prefix}{ctbname_index_start_num} using {dbname}.{stbname} tags ({tag_value_str}) {ctb_params};'
ctbname_index_start_num += 1
tdLog.info("create ctb sql: %s"%create_ctable_sql)
tsql.execute(create_ctable_sql)
def create_table(self, tsql, dbname=None, tbname="ntb", column_elm_list=None, count=1, **kwargs):
tbname_index_start_num = 1
tbname_prefix="ntb"
tb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
tb_params += f'{param} "{value}" '
column_type_str = self.gen_column_type_str(tbname_prefix, column_elm_list)
if int(count) <= 1:
create_table_sql = f'create table {dbname}.{tbname} ({column_type_str}) {tb_params};'
tsql.execute(create_table_sql)
else:
for _ in range(count):
create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};'
tbname_index_start_num += 1
tsql.execute(create_table_sql)
def insert_rows(self, tsql, dbname=None, tbname=None, column_ele_list=None, start_ts_value=None, count=1):
if start_ts_value is None:
start_ts_value = self.genTs()[0]
column_value_list = self.gen_column_value_list(column_ele_list, start_ts_value)
# column_value_str = ", ".join(str(v) for v in self.column_value_list)
column_value_str = ""
for column_value in column_value_list:
if isinstance(column_value, str):
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
column_value_str = column_value_str.rstrip()[:-1]
if int(count) <= 1:
insert_sql = f'insert into {self.tb_name} values ({column_value_str});'
tsql.execute(insert_sql)
else:
for num in range(count):
column_value_list = self.gen_column_value_list(column_ele_list, f'{start_ts_value}+{num}s')
column_value_str = ", ".join(str(v) for v in column_value_list)
insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});'
tsql.execute(insert_sql)
tdCom = TDCom()
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"])
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from db")
topicName = 'topic_%s_%s'%(paraDict['dbName'], paraDict['stbName'])
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s where c1 %% 4 == 0" %(topicName, paraDict['dbName'], paraDict['stbName']))
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicName
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt / 4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt / 4))
tdLog.exit("tmq consume rows error!")
time.sleep(10)
tdSql.query("drop topic %s"%topicName)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -92,13 +92,65 @@ class TMQCom:
tdLog.info(shellCmd)
os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision'])
tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"])
tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"])
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
paraDict["event"].set()
tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
ctbPrefix = paraDict['ctbPrefix']
ctbNum = paraDict["ctbNum"]
for i in range(ctbNum):
tbName = '%s%s'%(ctbPrefix,i)
tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
return
def threadFunction(self, **paraDict):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册