stb_subtable_value=f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")'ifself.subtableelseNone
ctb_subtable_value=f'concat(concat("{self.ctb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")'ifself.subtableelseNone
tb_subtable_value=f'concat(concat("{self.tb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")'ifself.subtableelseNone
ifpartition:
partition_elm=f'partition by {partition}{partition_elm_alias}'
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.stream_suffix}',des_table=self.stb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.stb_source_select_str} from {self.stb_name}{partition_elm} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="at_once",subtable_value=stb_subtable_value,fill_value=fill_value,fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.stream_suffix}',des_table=self.ctb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.stb_source_select_str} from {self.ctb_name}{partition_elm} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="at_once",subtable_value=ctb_subtable_value,fill_value=fill_value,fill_history_value=fill_history_value)
iffill_value:
if"value"infill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.stream_suffix}',des_table=self.tb_stream_des_table,source_sql=f'select _wstart AS wstart, {self.tb_source_select_str} from {self.tb_name}{partition_elm} interval({self.tdCom.dataDict["interval"]}s)',trigger_mode="at_once",subtable_value=tb_subtable_value,fill_value=fill_value,fill_history_value=fill_history_value)
self.tdCom.check_query_data(f'select wstart, {self.stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.stb_source_select_str} from {tbname}{partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart',sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, {self.tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.tb_source_select_str} from {tbname}{partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart',sorted=True)
ifself.subtable:
fortnamein[self.stb_name,self.ctb_name]:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter=0
forc1_valueintdSql.queryResult:
ifpartition=="c1":
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{abs(c1_value[1])}{self.subtable_suffix}`;')
elifpartitionisNone:
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}no_partition{self.subtable_suffix}`;')
elifpartition=="abs(c1)":
abs_c1_value=abs(c1_value[1])
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{abs_c1_value}{self.subtable_suffix}`;')
elifpartition=="tbname"andptn_counter==0:
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{self.ctb_name}{self.subtable_suffix}`;')
ptn_counter+=1
tdSql.checkEqual(tdSql.queryResult[0][0]>0,True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter=0
forc1_valueintdSql.queryResult:
ifpartition=="c1":
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{abs(c1_value[1])}{self.subtable_suffix}`;')
elifpartitionisNone:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}no_partition{self.subtable_suffix}`;')
elifpartition=="abs(c1)":
abs_c1_value=abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{abs_c1_value}{self.subtable_suffix}`;')
elifpartition=="tbname"andptn_counter==0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{self.tb_name}{self.subtable_suffix}`;')
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart',fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`',f'select * from (select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`',fill_value=fill_value)
else:
if"value"infill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
ifpartition=="tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart',fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`',f'select * from (select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`',fill_value=fill_value)
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts.replace("-","+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart',fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart,`min(c1)`',f'select * from (select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`',fill_value=fill_value)
else:
if"value"infill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
ifpartition=="tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart',f'select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts.replace("-","+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart',fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart,`min(c1)`',f'select * from (select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`',fill_value=fill_value)