提交 7450a858 编写于 作者: J jiajingbin 提交者: Haojun Liao

update

上级 ae100c06
......@@ -29,7 +29,7 @@ from util.constant import *
from dataclasses import dataclass,field
from typing import List
from datetime import datetime
import re
@dataclass
class DataSet:
ts_data : List[int] = field(default_factory=list)
......@@ -141,6 +141,8 @@ class TDCom:
self.default_interval = 5
self.stream_timeout = 12
self.record_history_ts = str()
self.precision = "ms"
self.date_time = self.genTs(precision=self.precision)[0]
self.subtable = True
self.partition_tbname_alias = "ptn_alias" if self.subtable else ""
self.partition_col_alias = "pcol_alias" if self.subtable else ""
......@@ -172,6 +174,31 @@ class TDCom:
self.update = False
self.partition_by_downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "count(c8)", "spread(c1)",
"stddev(c2)", "hyperloglog(c11)", "min(t1)", "max(t2)", "sum(t3)", "first(t4)", "last(t5)", "count(t8)", "spread(t1)", "stddev(t2)"]
self.stb_data_filter_sql = f'ts >= {self.date_time}+1s and c1 = 1 or c2 > 1 and c3 != 4 or c4 <= 3 and c9 <> 0 or c10 is not Null or c11 is Null or \
c12 between "na" and "nchar4" and c11 not between "bi" and "binary" and c12 match "nchar[19]" and c12 nmatch "nchar[25]" or c13 = True or \
c5 in (1, 2, 3) or c6 not in (6, 7) and c12 like "nch%" and c11 not like "bina_" and c6 < 10 or c12 is Null or c8 >= 4 and t1 = 1 or t2 > 1 \
and t3 != 4 or c4 <= 3 and t9 <> 0 or t10 is not Null or t11 is Null or t12 between "na" and "nchar4" and t11 not between "bi" and "binary" \
or t12 match "nchar[19]" or t12 nmatch "nchar[25]" or t13 = True or t5 in (1, 2, 3) or t6 not in (6, 7) and t12 like "nch%" \
and t11 not like "bina_" and t6 <= 10 or t12 is Null or t8 >= 4'
self.tb_data_filter_sql = self.stb_data_filter_sql.partition(" and t1")[0]
self.filter_source_select_elm = "*"
self.stb_filter_des_select_elm = "ts, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13"
self.partitial_stb_filter_des_select_elm = ",".join(self.stb_filter_des_select_elm.split(",")[:3])
self.exchange_stb_filter_des_select_elm = ",".join([self.stb_filter_des_select_elm.split(",")[0], self.stb_filter_des_select_elm.split(",")[2], self.stb_filter_des_select_elm.split(",")[1]])
self.partitial_ext_tb_source_select_str = ','.join(self.downsampling_function_list[0:2])
self.tb_filter_des_select_elm = self.stb_filter_des_select_elm.partition(", t1")[0]
self.tag_filter_des_select_elm = self.stb_filter_des_select_elm.partition("c13, ")[2]
self.partition_by_stb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.partition_by_downsampling_function_list)))
self.partition_by_stb_source_select_str = ','.join(self.partition_by_downsampling_function_list)
self.exchange_tag_filter_des_select_elm = ",".join([self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[0], self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[2], self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[1]])
self.partitial_tag_filter_des_select_elm = ",".join(self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[:3])
self.partitial_tag_stb_filter_des_select_elm = "ts, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, t1, t3, t2, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13"
self.cast_tag_filter_des_select_elm = "t5,t11,t13"
self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)"
self.tag_count = len(self.tag_filter_des_select_elm.split(","))
self.state_window_range = list()
# def init(self, conn, logSql):
# # tdSql.init(conn.cursor(), logSql)
......@@ -1322,6 +1349,22 @@ class TDCom:
nl.append(tuple(query_data_l))
return nl
def trans_time_to_s(self, runtime):
if "d" in str(runtime).lower():
d_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0]
s_num = float(d_num) * 24 * 60 * 60
elif "h" in str(runtime).lower():
h_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0]
s_num = float(h_num) * 60 * 60
elif "m" in str(runtime).lower():
m_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0]
s_num = float(m_num) * 60
elif "s" in str(runtime).lower():
s_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0]
else:
s_num = 60
return int(s_num)
def check_query_data(self, sql1, sql2, sorted=False, fill_value=None, tag_value_list=None, defined_tag_count=None, partition=True, use_exist_stb=False, subtable=None, reverse_check=False):
tdLog.info("checking query data ...")
if tag_value_list:
......
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_max_delay_interval(self, interval, max_delay, watermark=None, fill_value=None, delete=False):
tdLog.info(f"*** testing stream max_delay+interval: interval: {interval}, watermark: {watermark}, fill_value: {fill_value}, delete: {delete} ***")
self.delete = delete
self.case_name = sys._getframe().f_code.co_name
if watermark is not None:
self.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, watermark=watermark)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = 1658921623245
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
fill_watermark_value = watermark_value
else:
watermark_value = None
fill_watermark_value = "0s"
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
init_num = 0
start_time = self.tdCom.date_time
for i in range(self.tdCom.range_count):
if i == 0:
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
else:
self.tdCom.date_time = window_close_ts + self.tdCom.offset
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
if not fill_value:
for tbname in [self.stb_stream_des_table, self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]:
if tbname != self.tdCom.tb_stream_des_table:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}')
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}')
tdSql.checkEqual(tdSql.queryRows, init_num)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if i == 0:
init_num = 2 + i
if watermark is not None:
init_num += 1
else:
init_num += 1
time.sleep(int(max_delay.replace("s", "")))
if not fill_value:
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)')
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)')
if fill_value:
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(self.tdCom.date_time)+f'+{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
future_ts_bigint = self.tdCom.str_ts_trans_bigint(future_ts)
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval'])
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update:
for i in range(self.tdCom.range_count):
if i == 0:
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
else:
self.tdCom.date_time = window_close_ts + self.tdCom.offset
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts))
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts))
time.sleep(int(max_delay.replace("s", "")))
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value)
def run(self):
for watermark in [None, random.randint(20, 25)]:
self.watermark_max_delay_interval(interval=random.choice([15]), watermark=watermark, max_delay=f"{random.randint(5, 6)}s")
for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
self.watermark_max_delay_interval(interval=random.randint(10, 15), watermark=None, max_delay=f"{random.randint(5, 6)}s", fill_value=fill_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_max_delay_session(self, session, watermark, max_delay, fill_history_value=None):
tdLog.info(f"*** testing stream max_delay+session: session: {session}, watermark: {watermark}, max_delay: {max_delay}, fill_history_value: {fill_history_value} ***")
self.tdCom.case_name = sys._getframe().f_code.co_name
if watermark is not None:
self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = self.tdCom.dataDict["start_ts"]
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
else:
watermark_value = None
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
init_num = 0
for i in range(self.tdCom.range_count):
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
else:
self.tdCom.date_time = window_close_ts + 1
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
if watermark_value is not None:
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
for tbname in [self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]:
if tbname != self.tdCom.tb_stream_des_table:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}')
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}')
if not fill_history_value:
tdSql.checkEqual(tdSql.queryRows, init_num)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if i == 0:
init_num = 2 + i
else:
init_num += 1
if watermark_value is not None:
expected_value = init_num
else:
expected_value = i + 1
if not fill_history_value:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay)
else:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay)
else:
self.tdCom.update_delete_history_data(delete=True)
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
else:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
def run(self):
for fill_history_value in [None, 1]:
for watermark in [None, random.randint(20, 30)]:
self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(1, 3)}s", fill_history_value=fill_history_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def partitionby_interval(self, interval=None, partition_by_elm="tbname", ignore_expired=None):
tdLog.info(f"*** testing stream partition+interval: interval: {interval}, partition_by: {partition_by_elm}, ignore_expired: {ignore_expired} ***")
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
ctb_name_list = list()
for i in range(1, self.tdCom.range_count):
ctb_name = self.tdCom.get_long_name()
ctb_name_list.append(ctb_name)
self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name)
if interval is not None:
source_sql = f'select _wstart AS wstart, {self.tdCom.partition_by_stb_source_select_str} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s)'
else:
source_sql = f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm}'
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=source_sql, ignore_expired=ignore_expired)
# insert data
count = 1
step_count = 1
for i in range(1, self.tdCom.range_count):
if i == 1:
record_window_close_ts = self.tdCom.date_time - 15 * self.tdCom.offset
ctb_name = self.tdCom.get_long_name()
self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name)
if i % 2 == 0:
step_count += i
for j in range(count, step_count):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s')
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s')
count += i
else:
step_count += 1
for i in range(2):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s')
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s')
count += 1
# check result
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
if interval is not None:
self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
if self.tdCom.disorder:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=record_window_close_ts)
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=record_window_close_ts)
if ignore_expired:
if "first" not in colname and "last" not in colname:
for colname in self.tdCom.partition_by_downsampling_function_list:
if interval is not None:
tdSql.query(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;')
res1 = tdSql.queryResult
tdSql.query(f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
res2 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
else:
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
if interval is not None:
self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
def run(self):
for interval in [None, 10]:
for ignore_expired in [0, 1]:
self.partitionby_interval(interval=interval, partition_by_elm="tbname", ignore_expired=ignore_expired)
self.partitionby_interval(interval=10, partition_by_elm="t1")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
......@@ -12,34 +12,6 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
self.tdCom.subtable = True
self.tdCom.update = True
self.tdCom.disorder = True
if self.tdCom.disorder:
self.tdCom.update = False
self.tdCom.partition_tbname_alias = "ptn_alias" if self.tdCom.subtable else ""
self.tdCom.partition_col_alias = "pcol_alias" if self.tdCom.subtable else ""
self.tdCom.partition_tag_alias = "ptag_alias" if self.tdCom.subtable else ""
self.tdCom.partition_expression_alias = "pexp_alias" if self.tdCom.subtable else ""
self.stb_name = str()
self.ctb_name = str()
self.tb_name = str()
self.tdCom.des_table_suffix = "_output"
self.tdCom.stream_suffix = "_stream"
self.tdCom.stream_case_when_tbname = "tbname"
self.tdCom.subtable_prefix = "prefix_" if self.tdCom.subtable else ""
self.tdCom.subtable_suffix = "_suffix" if self.tdCom.subtable else ""
self.stb_stream_des_table = str()
self.ctb_stream_des_table = str()
self.tb_stream_des_table = str()
self.downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "apercentile(c6, 50)", "avg(c7)", "count(c8)", "spread(c1)",
"stddev(c2)", "hyperloglog(c11)", "timediff(1, 0, 1h)", "timezone()", "to_iso8601(1)", 'to_unixtimestamp("1970-01-01T08:00:00+08:00")', "min(t1)", "max(t2)", "sum(t3)",
"first(t4)", "last(t5)", "apercentile(t6, 50)", "avg(t7)", "count(t8)", "spread(t1)", "stddev(t2)", "hyperloglog(t11)"]
self.tdCom.stb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list)))
self.tdCom.stb_source_select_str = ','.join(self.downsampling_function_list)
self.tdCom.tb_source_select_str = ','.join(self.downsampling_function_list[0:15])
self.tdCom.partition_by_downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "count(c8)", "spread(c1)",
"stddev(c2)", "hyperloglog(c11)", "min(t1)", "max(t2)", "sum(t3)", "first(t4)", "last(t5)", "count(t8)", "spread(t1)", "stddev(t2)"]
def watermark_window_close_session(self, session, watermark, fill_history_value=None, delete=True):
tdLog.info(f"*** testing stream window_close+session: session: {session}, watermark: {watermark}, fill_history: {fill_history_value}, delete: {delete} ***")
......@@ -53,7 +25,6 @@ class TDTestCase:
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.tb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list[0:15])))
self.tdCom.date_time = self.tdCom.dataDict["start_ts"]
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册