stb_subtable_value=f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(cast({subtable} as int unsigned) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")'ifself.tdCom.subtableelseNone
# self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.ext_tb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb)
ifpartition:
stream_sql=self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ext_stb_stream_des_table,subtable_value=stb_subtable_value,source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="at_once",fill_value=fill_value,fill_history_value=fill_history_value,stb_field_name_value=stb_field_name_value,tag_value=tag_value,use_exist_stb=use_exist_stb,use_except=use_except)
else:
stream_sql=self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ext_stb_stream_des_table,subtable_value=stb_subtable_value,source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="at_once",fill_value=fill_value,fill_history_value=fill_history_value,stb_field_name_value=stb_field_name_value,tag_value=tag_value,use_exist_stb=use_exist_stb,use_except=use_except)
self.tdCom.check_query_data(f'select {self.tdCom.partitial_stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart',sorted=True)
self.tdCom.check_query_data(f'select {self.tdCom.partitial_stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, cast(max(c2) as tinyint), cast(min(c1) as smallint) from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart',sorted=True)
self.tdCom.check_query_data(f'select {self.tdCom.partitial_tag_stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart',defined_tag_count=defined_tag_count,tag_value_list=tag_value_list)
tdSql.query(f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart')
limit_row=tdSql.queryRows
self.tdCom.check_query_data(f'select {self.tdCom.cast_tag_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select cast(t1 as TINYINT UNSIGNED),cast(t2 as varchar(256)),cast(t3 as bool) from {self.stb_name} order by ts limit {limit_row}')
tdSql.query(f'select t1,t2,t3,t4,t6,t7,t8,t9,t10,t12 from ext_{self.stb_name}{self.tdCom.des_table_suffix};')
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart',defined_tag_count=defined_tag_count,tag_value_list=tag_value_list)
else:
ifuse_exist_stbandnottag_value:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s) order by wstart',defined_tag_count=defined_tag_count,tag_value_list=tag_value_list,partition=partition,use_exist_stb=use_exist_stb)
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s) order by wstart',defined_tag_count=defined_tag_count,tag_value_list=tag_value_list,partition=partition,subtable=subtable)
ifsubtable:
fortnamein[self.stb_name]:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter=0
forc1_valueintdSql.queryResult:
ifpartition=="c1":
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
elifpartition=="abs(c1)":
abs_c1_value=abs(c1_value[1])
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elifpartition=="tbname"andptn_counter==0:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter+=1
else:
tdSql.query(f'select cast(cast(cast({c1_value[1]} as int unsigned) as bigint) as varchar(100))')
subtable_value=tdSql.queryResult[0][0]
ifsubtable=="constant":
return
else:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{subtable_value}{self.tdCom.subtable_suffix}`;')
self.at_once_interval_ext(interval=random.randint(10,15),delete=delete,fill_history_value=fill_history_value,partition="t1 as t5,t2 as t11,t3 as t13",subtable=None,stb_field_name_value=None,tag_value="t5,t11,t13",use_exist_stb=True)
self.at_once_interval_ext(interval=random.randint(10,15),delete=False,fill_history_value=1,partition="t1 as t5,t2 as t11",subtable=None,stb_field_name_value=self.tdCom.tb_filter_des_select_elm,tag_value="t5,t11,t13",use_exist_stb=True,use_except=True)
self.at_once_interval_ext(interval=random.randint(10,15),delete=False,fill_history_value=1,partition="t1 as t5,t2 as t11,t3 as t14",subtable=None,stb_field_name_value=self.tdCom.tb_filter_des_select_elm,tag_value="t5,t11,t13",use_exist_stb=True,use_except=True)
self.at_once_interval_ext(interval=random.randint(10,15),delete=False,fill_history_value=1,partition="t1 as t5,t2 as t11,t3 as c13",subtable=None,stb_field_name_value=self.tdCom.tb_filter_des_select_elm,tag_value="t5,t11,c13",use_exist_stb=True,use_except=True)
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}',des_table=self.stb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}',des_table=self.stb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
iffill_value:
iffill_value:
if"value"infill_value.lower():
if"value"infill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.tb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.tb_stream_des_table,source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tb_stream_des_table,source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)',trigger_mode="max_delay",watermark=watermark_value,max_delay=max_delay_value,fill_history_value=fill_history_value)
stb_subtable_value=f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")'ifself.subtableelseNone
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ext_stb_stream_des_table,source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s)',trigger_mode="window_close",watermark=watermark_value,subtable_value=stb_subtable_value,fill_history_value=fill_history_value,stb_field_name_value=stb_field_name_value,tag_value=tag_value,use_exist_stb=use_exist_stb)
tdSql.query(f'select {tag_value} from {self.stb_name}')
tag_value_list=tdSql.queryResult
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts',f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart limit {expected_value};',sorted=True,defined_tag_count=defined_tag_count,tag_value_list=tag_value_list,partition=partition)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} state_window({state_window_col_name})',trigger_mode="window_close")
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}',des_table=self.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} state_window({state_window_col_name})',trigger_mode="window_close")
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tdCom.tb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} state_window({state_window_col_name})',trigger_mode="window_close")
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}',des_table=self.tb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} state_window({state_window_col_name})',trigger_mode="window_close")