未验证 提交 8f27724f 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22701 from taosdata/fix/test_1x

test: add stream cases to main
...@@ -383,6 +383,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -383,6 +383,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
if (pItem != NULL) {
streamFreeQitem(pItem);
}
continue; continue;
} }
......
...@@ -982,8 +982,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -982,8 +982,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
} }
streamFreeQitem(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -521,6 +521,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ...@@ -521,6 +521,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
taosTmrStop(pTask->schedInfo.pTimer);
pTask->info.triggerParam = 0;
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaRemoveTask(pMeta, keys); streamMetaRemoveTask(pMeta, keys);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
......
...@@ -6,6 +6,21 @@ ...@@ -6,6 +6,21 @@
,,y,unit-test,bash test.sh ,,y,unit-test,bash test.sh
#system test #system test
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_state_window.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_state_window.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/max_delay_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/max_delay_session.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval_ext.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/max_delay_interval_ext.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3
...@@ -24,7 +39,7 @@ ...@@ -24,7 +39,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
......
此差异已折叠。
...@@ -111,7 +111,7 @@ class TDSql: ...@@ -111,7 +111,7 @@ class TDSql:
return self.error_info return self.error_info
def query(self, sql, row_tag=None,queryTimes=10): def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
self.sql = sql self.sql = sql
i=1 i=1
while i <= queryTimes: while i <= queryTimes:
...@@ -120,6 +120,17 @@ class TDSql: ...@@ -120,6 +120,17 @@ class TDSql:
self.queryResult = self.cursor.fetchall() self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult) self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description) self.queryCols = len(self.cursor.description)
if count_expected_res is not None:
counter = 0
while count_expected_res != self.queryResult[0][0]:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
if counter < queryTimes:
counter += 0.5
time.sleep(0.5)
else:
return False
if row_tag: if row_tag:
return self.queryResult return self.queryResult
return self.queryRows return self.queryRows
...@@ -501,7 +512,8 @@ class TDSql: ...@@ -501,7 +512,8 @@ class TDSql:
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
def checkNotEqual(self, elm, expect_elm): def checkNotEqual(self, elm, expect_elm):
if elm != expect_elm: if elm != expect_elm:
...@@ -509,7 +521,8 @@ class TDSql: ...@@ -509,7 +521,8 @@ class TDSql:
else: else:
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
tdLog.exit("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args) tdLog.info("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args)
raise Exception
def get_times(self, time_str, precision="ms"): def get_times(self, time_str, precision="ms"):
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
......
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None):
tdLog.info(f"*** testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***")
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if partition == "tbname":
if case_when:
stream_case_when_partition = case_when
else:
stream_case_when_partition = self.tdCom.partition_tbname_alias
partition_elm_alias = self.tdCom.partition_tbname_alias
elif partition == "c1":
if case_when:
stream_case_when_partition = case_when
else:
stream_case_when_partition = self.tdCom.partition_col_alias
partition_elm_alias = self.tdCom.partition_col_alias
elif partition == "abs(c1)":
partition_elm_alias = self.tdCom.partition_expression_alias
elif partition is None:
partition_elm_alias = '"no_partition"'
else:
partition_elm_alias = self.tdCom.partition_tag_alias
if partition == "tbname" or partition is None:
if case_when:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
if partition:
partition_elm = f'partition by {partition} {partition_elm_alias}'
else:
partition_elm = ""
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
start_time = self.tdCom.date_time
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
if partition:
partition_elm = f'partition by {partition}'
else:
partition_elm = ""
if not fill_value:
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True)
if self.tdCom.subtable:
for tname in [self.stb_name, self.ctb_name]:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
if fill_value:
end_date_time = self.tdCom.date_time
final_range_count = self.tdCom.range_count
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(end_date_time)+f'+{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
self.tdCom.date_time = start_time
# update
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(end_date_time)+f'+{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
self.tdCom.date_time = start_time
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
self.tdCom.date_time += 1
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts.replace("-", "+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts.replace("-", "+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
def run(self):
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition="c1", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition="abs(c1)", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition=None, delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end')
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL")
# for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value)
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
此差异已折叠。
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def at_once_session(self, session, ignore_expired=None, ignore_update=None, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True):
tdLog.info(f"*** testing stream at_once+interval: session: {session}, ignore_expired: {ignore_expired}, ignore_update: {ignore_update}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, case_when: {case_when}, subtable: {subtable} ***")
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if partition == "tbname":
if case_when:
stream_case_when_partition = case_when
else:
stream_case_when_partition = self.tdCom.partition_tbname_alias
partition_elm_alias = self.tdCom.partition_tbname_alias
elif partition == "c1":
partition_elm_alias = self.tdCom.partition_col_alias
elif partition == "abs(c1)":
if subtable:
partition_elm_alias = self.tdCom.partition_expression_alias
else:
partition_elm_alias = "constant"
else:
partition_elm_alias = self.tdCom.partition_tag_alias
if partition == "tbname" or subtable is None:
if case_when:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
if subtable:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", "{partition_elm_alias}"), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", "{partition_elm_alias}"), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
if 'abs' in partition:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=ctb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
for i in range(self.tdCom.range_count):
ctb_name = self.tdCom.get_long_name()
self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name)
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, session=session)
else:
self.tdCom.date_time = window_close_ts + 1
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, session=session)
if i == 0:
record_window_close_ts = window_close_ts
for ts_value in [self.tdCom.date_time, window_close_ts]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True)
if self.delete and i%2 != 0:
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=dt)
ts_value += 1
# check result
if partition != "tbname":
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {self.ctb_stream_des_table} order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.ctb_name} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', sorted=True)
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {self.tb_stream_des_table} order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;')
else:
for tbname in [self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s)', sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s)', sorted=True)
if self.tdCom.disorder:
if ignore_expired:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res2 = tdSql.queryResult
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=str(self.tdCom.date_time)+f'-{self.tdCom.default_interval*(self.tdCom.range_count+session)}s')
tdSql.query(f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
res1 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res1 = tdSql.queryResult
tdSql.checkEqual(res1, res2)
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res2 = tdSql.queryResult
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=str(self.tdCom.date_time)+f'-{self.tdCom.default_interval*(self.tdCom.range_count+session)}s')
tdSql.query(f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
res1 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res1 = tdSql.queryResult
tdSql.checkEqual(res1, res2)
else:
if ignore_update:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res2 = tdSql.queryResult
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=record_window_close_ts)
tdSql.query(f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
res1 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}')
res2 = tdSql.queryResult
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=record_window_close_ts)
tdSql.query(f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
res1 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
else:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=record_window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=record_window_close_ts)
if partition != "tbname":
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {self.ctb_stream_des_table} order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.ctb_name} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', sorted=True)
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {self.tb_stream_des_table} order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart, `min(c1)`,`max(c2)`,`sum(c3)`;')
else:
for tbname in [self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s)', sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} partition by {partition} session(ts, {self.tdCom.dataDict["session"]}s)', sorted=True)
if fill_history_value:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.record_history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.record_history_ts)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-"))
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-"))
if self.tdCom.subtable:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if c1_value[1] is not None:
if partition == "c1":
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
if subtable:
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
else:
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if c1_value[1] is not None:
if partition == "c1":
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
if subtable:
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
else:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
def run(self):
self.at_once_session(session=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, delete=True, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end')
for subtable in [None, True]:
self.at_once_session(session=random.randint(10, 15), subtable=subtable, partition="abs(c1)")
for ignore_expired in [None, 0, 1]:
for fill_history_value in [None, 1]:
self.at_once_session(session=random.randint(10, 15), ignore_expired=ignore_expired, fill_history_value=fill_history_value)
for fill_history_value in [None, 1]:
self.at_once_session(session=random.randint(10, 15), partition="tbname", delete=True, fill_history_value=fill_history_value)
self.at_once_session(session=random.randint(10, 15), partition="c1", delete=True, fill_history_value=fill_history_value)
self.at_once_session(session=random.randint(10, 15), partition="abs(c1)", delete=True, fill_history_value=fill_history_value)
self.at_once_session(session=random.randint(10, 15), partition="abs(c1)", delete=True, subtable=None, fill_history_value=fill_history_value)
self.at_once_session(session=random.randint(10, 15), ignore_update=1, fill_history_value=fill_history_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def at_once_state_window(self, state_window, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True):
tdLog.info(f"*** testing stream at_once+interval: state_window: {state_window}, partition: {partition}, fill_history: {fill_history_value}, case_when: {case_when}***, delete: {delete}")
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(state_window=state_window, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if partition == "tbname":
partition_elm_alias = self.tdCom.partition_tbname_alias
elif partition == "c1" and subtable is not None:
partition_elm_alias = self.tdCom.partition_col_alias
elif partition == "c1" and subtable is None:
partition_elm_alias = 'constant'
elif partition == "abs(c1)":
partition_elm_alias = self.tdCom.partition_expression_alias
else:
partition_elm_alias = self.tdCom.partition_tag_alias
if partition == "tbname" or subtable is None:
if partition == "tbname":
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", "{partition_elm_alias}"), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", "{partition_elm_alias}"), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
if 'abs' in partition:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
state_window_col_name = self.tdCom.dataDict["state_window"]
if case_when:
stream_state_window = case_when
else:
stream_state_window = state_window_col_name
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
range_times = self.tdCom.range_count
state_window_max = self.tdCom.dataDict['state_window_max']
for i in range(range_times):
state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times))
for i in range(2, range_times+3):
tdSql.execute(f'insert into {self.ctb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.tdCom.update and i%2 == 0:
tdSql.execute(f'insert into {self.ctb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.delete and i%2 != 0:
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
tdSql.execute(f'delete from {self.ctb_name} where ts = {dt}')
tdSql.execute(f'insert into {self.tb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.tdCom.update and i%2 == 0:
tdSql.execute(f'insert into {self.tb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.delete and i%2 != 0:
tdSql.execute(f'delete from {self.tb_name} where ts = {dt}')
self.tdCom.date_time += 1
# for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} partition by {partition} state_window({state_window_col_name}) order by wstart,{state_window}', sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} partition by {partition} state_window({state_window_col_name}) order by wstart,{state_window}', sorted=True)
if fill_history_value:
self.tdCom.update_delete_history_data(self.delete)
if self.tdCom.subtable:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
if subtable:
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;')
else:
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;')
return
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
if subtable:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{c1_value[1]}{self.tdCom.subtable_suffix}`;')
else:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}`;')
return
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
def run(self):
self.at_once_state_window(state_window="c2", partition="tbname", case_when="case when c1 < 0 then c1 else c2 end")
self.at_once_state_window(state_window="c1", partition="tbname", case_when="case when c1 >= 0 then c1 else c2 end")
for fill_history_value in [None, 1]:
self.at_once_state_window(state_window="c1", partition="tbname", fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="c1", fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="abs(c1)", fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="tbname", delete=True, fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="c1", delete=True, fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="abs(c1)", delete=True, fill_history_value=fill_history_value)
self.at_once_state_window(state_window="c1", partition="c1", subtable=None, fill_history_value=fill_history_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_max_delay_interval(self, interval, max_delay, watermark=None, fill_value=None, delete=False):
tdLog.info(f"*** testing stream max_delay+interval: interval: {interval}, watermark: {watermark}, fill_value: {fill_value}, delete: {delete} ***")
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
if watermark is not None:
self.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, watermark=watermark)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = 1658921623245
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
fill_watermark_value = watermark_value
else:
watermark_value = None
fill_watermark_value = "0s"
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value)
init_num = 0
start_time = self.tdCom.date_time
for i in range(self.tdCom.range_count):
if i == 0:
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
else:
self.tdCom.date_time = window_close_ts + self.tdCom.offset
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
# if not fill_value:
# for tbname in [self.stb_stream_des_table, self.ctb_stream_des_table, self.tb_stream_des_table]:
# if tbname != self.tb_stream_des_table:
# tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}')
# else:
# tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}')
# tdSql.checkEqual(tdSql.queryRows, init_num)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if i == 0:
init_num = 2 + i
if watermark is not None:
init_num += 1
else:
init_num += 1
time.sleep(int(max_delay.replace("s", "")))
if not fill_value:
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)')
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)')
if fill_value:
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(self.tdCom.date_time)+f'+{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
future_ts_bigint = self.tdCom.str_ts_trans_bigint(future_ts)
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval'])
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update:
for i in range(self.tdCom.range_count):
if i == 0:
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
else:
self.tdCom.date_time = window_close_ts + self.tdCom.offset
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts))
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts))
time.sleep(int(max_delay.replace("s", "")))
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value)
def run(self):
for watermark in [None, random.randint(20, 25)]:
self.watermark_max_delay_interval(interval=random.choice([15]), watermark=watermark, max_delay=f"{random.randint(5, 6)}s")
for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
self.watermark_max_delay_interval(interval=random.randint(10, 15), watermark=None, max_delay=f"{random.randint(5, 6)}s", fill_value=fill_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_max_delay_interval_ext(self, interval, max_delay, watermark=None, fill_value=None, partition="tbname", delete=False, fill_history_value=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False):
tdLog.info(f"*** testing stream max_delay+interval+exist_stb+custom_tag: interval: {interval}, partition: {partition}, max_delay: {max_delay}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***")
if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm:
partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str
else:
partitial_tb_source_str = self.tdCom.ext_tb_source_select_str
if not stb_field_name_value:
stb_field_name_value = self.tdCom.tb_filter_des_select_elm
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
defined_tag_count = len(tag_value.split())
if watermark is not None:
self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, watermark=watermark, ext_stb=use_exist_stb)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if subtable:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
else:
stb_subtable_value = None
self.tdCom.date_time = 1658921623245
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
else:
watermark_value = None
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb)
init_num = 0
for i in range(self.tdCom.range_count):
if i == 0:
if watermark is not None:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
else:
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
else:
self.tdCom.date_time = window_close_ts + self.tdCom.offset
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
if i == 0:
init_num = 2 + i
if watermark is not None:
init_num += 1
else:
init_num += 1
time.sleep(int(max_delay.replace("s", "")))
if tag_value:
tdSql.query(f'select {tag_value} from {self.stb_name}')
tag_value_list = tdSql.queryResult
if not fill_value:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts;', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition)
def run(self):
for delete in [True, False]:
for fill_history_value in [0, 1]:
self.watermark_max_delay_interval_ext(interval=random.choice([15]), watermark=random.randint(20, 25), max_delay=f"{random.randint(5, 6)}s", delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_max_delay_session(self, session, watermark, max_delay, fill_history_value=None):
tdLog.info(f"*** testing stream max_delay+session: session: {session}, watermark: {watermark}, max_delay: {max_delay}, fill_history_value: {fill_history_value} ***")
self.tdCom.case_name = sys._getframe().f_code.co_name
if watermark is not None:
self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = self.tdCom.dataDict["start_ts"]
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
else:
watermark_value = None
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value)
init_num = 0
for i in range(self.tdCom.range_count):
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
else:
self.tdCom.date_time = window_close_ts + 1
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
if watermark_value is not None:
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
for tbname in [self.ctb_stream_des_table, self.tb_stream_des_table]:
if tbname != self.tb_stream_des_table:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}')
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}')
if not fill_history_value:
tdSql.checkEqual(tdSql.queryRows, init_num)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if i == 0:
init_num = 2 + i
else:
init_num += 1
if watermark_value is not None:
expected_value = init_num
else:
expected_value = i + 1
if not fill_history_value:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay)
else:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay)
else:
self.tdCom.update_delete_history_data(delete=True)
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
else:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)')
def run(self):
for fill_history_value in [None, 1]:
for watermark in [None, random.randint(20, 30)]:
self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(1, 3)}s", fill_history_value=fill_history_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def partitionby_interval(self, interval=None, partition_by_elm="tbname", ignore_expired=None):
tdLog.info(f"*** testing stream partition+interval: interval: {interval}, partition_by: {partition_by_elm}, ignore_expired: {ignore_expired} ***")
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
ctb_name_list = list()
for i in range(1, self.tdCom.range_count):
ctb_name = self.tdCom.get_long_name()
ctb_name_list.append(ctb_name)
self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name)
if interval is not None:
source_sql = f'select _wstart AS wstart, {self.tdCom.partition_by_stb_source_select_str} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s)'
else:
source_sql = f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm}'
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=source_sql, ignore_expired=ignore_expired)
# insert data
count = 1
step_count = 1
for i in range(1, self.tdCom.range_count):
if i == 1:
record_window_close_ts = self.tdCom.date_time - 15 * self.tdCom.offset
ctb_name = self.tdCom.get_long_name()
self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name)
if i % 2 == 0:
step_count += i
for j in range(count, step_count):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s')
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s')
count += i
else:
step_count += 1
for i in range(2):
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s')
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s')
count += 1
# check result
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
if interval is not None:
self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
if self.tdCom.disorder:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=record_window_close_ts)
for ctb_name in ctb_name_list:
self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=record_window_close_ts)
if ignore_expired:
if "first" not in colname and "last" not in colname:
for colname in self.tdCom.partition_by_downsampling_function_list:
if interval is not None:
tdSql.query(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;')
res1 = tdSql.queryResult
tdSql.query(f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
res2 = tdSql.queryResult
tdSql.checkNotEqual(res1, res2)
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
else:
for colname in self.tdCom.partition_by_downsampling_function_list:
if "first" not in colname and "last" not in colname:
if interval is not None:
self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;')
else:
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;')
def run(self):
for interval in [None, 10]:
for ignore_expired in [0, 1]:
self.partitionby_interval(interval=interval, partition_by_elm="tbname", ignore_expired=ignore_expired)
self.partitionby_interval(interval=10, partition_by_elm="t1")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def pause_resume_test(self, interval, partition="tbname", delete=False, fill_history_value=None, pause=True, resume=True, ignore_untreated=False):
tdLog.info(f"*** testing stream pause+resume: interval: {interval}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, ignore_untreated: {ignore_untreated} ***")
if_exist_value_list = [None, True]
if_exist = random.choice(if_exist_value_list)
reverse_check = True if ignore_untreated else False
range_count = (self.tdCom.range_count + 3) * 3
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if partition == "tbname":
partition_elm_alias = self.tdCom.partition_tbname_alias
elif partition == "c1":
partition_elm_alias = self.tdCom.partition_col_alias
elif partition == "abs(c1)":
partition_elm_alias = self.tdCom.partition_expression_alias
elif partition is None:
partition_elm_alias = '"no_partition"'
else:
partition_elm_alias = self.tdCom.partition_tag_alias
if partition == "tbname" or partition is None:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
else:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
if partition:
partition_elm = f'partition by {partition} {partition_elm_alias}'
else:
partition_elm = ""
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
for i in range(range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
if partition:
partition_elm = f'partition by {partition}'
else:
partition_elm = ""
# if i == int(range_count/2):
if i > 2 and i % 3 == 0:
for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']:
if if_exist is not None:
tdSql.execute(f'pause stream if exists {stream_name}_no_exist')
tdSql.error(f'pause stream if not exists {stream_name}')
tdSql.error(f'pause stream {stream_name}_no_exist')
self.tdCom.pause_stream(stream_name, if_exist)
if pause and not resume and range_count-i <= 3:
time.sleep(self.tdCom.default_interval)
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart')
res_after_pause = tdSql.queryResult
if resume:
if i > 2 and i % 3 != 0:
for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']:
if if_exist is not None:
tdSql.execute(f'resume stream if exists {stream_name}_no_exist')
tdSql.error(f'resume stream if not exists {stream_name}')
self.tdCom.resume_stream(stream_name, if_exist, None, ignore_untreated)
if pause and not resume:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart')
res_without_resume = tdSql.queryResult
tdSql.checkEqual(res_after_pause, res_without_resume)
else:
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check)
if self.tdCom.subtable:
for tname in [self.stb_name, self.ctb_name]:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
def run(self):
for delete in [True, False]:
for fill_history_value in [0, 1]:
# pause/resume
self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=False, fill_history_value=fill_history_value, delete=delete)
self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=True, fill_history_value=fill_history_value, delete=delete)
# self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", resume=False, fill_history_value=fill_history_value, delete=delete)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
此差异已折叠。
此差异已折叠。
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_window_close_session(self, session, watermark, fill_history_value=None, delete=True):
tdLog.info(f"*** testing stream window_close+session: session: {session}, watermark: {watermark}, fill_history: {fill_history_value}, delete: {delete} ***")
self.case_name = sys._getframe().f_code.co_name
if watermark is not None:
self.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = self.tdCom.dataDict["start_ts"]
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
else:
watermark_value = None
# create stb/ctb/tb stream
# self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="window_close", watermark=watermark_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="window_close", watermark=watermark_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="window_close", watermark=watermark_value, fill_history_value=fill_history_value)
for i in range(self.tdCom.range_count):
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
else:
self.tdCom.date_time = window_close_ts + 1
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
if watermark_value is not None:
expected_value = i + 1
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
# for tbname in [self.stb_stream_des_table, self.ctb_stream_des_table, self.tb_stream_des_table]:
for tbname in [self.ctb_stream_des_table, self.tb_stream_des_table]:
if tbname != self.tb_stream_des_table:
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}')
else:
tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}')
if not fill_history_value:
tdSql.checkEqual(tdSql.queryRows, i)
else:
expected_value = i
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts)
if fill_history_value:
self.tdCom.update_delete_history_data(delete=delete)
# for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if not fill_history_value:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s) limit {expected_value}', expected_value)
else:
self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s) limit {expected_value}', expected_value)
else:
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s) limit {expected_value+1}')
else:
self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s) limit {expected_value+1}')
def run(self):
for fill_history_value in [None, 1]:
for watermark in [None, random.randint(20, 25)]:
self.watermark_window_close_session(session=random.randint(10, 15), watermark=watermark, fill_history_value=fill_history_value)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, delete=False):
tdLog.info(f"*** testing stream window_close+session+exist_stb+custom_tag: session: {session}, partition: {partition}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***")
if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm:
partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str
else:
partitial_tb_source_str = self.tdCom.ext_tb_source_select_str
if not stb_field_name_value:
stb_field_name_value = self.tdCom.tb_filter_des_select_elm
self.tdCom.case_name = sys._getframe().f_code.co_name
defined_tag_count = len(tag_value.split())
if watermark is not None:
self.case_name = "watermark" + sys._getframe().f_code.co_name
self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value, ext_stb=use_exist_stb)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
self.tdCom.date_time = self.tdCom.dataDict["start_ts"]
if subtable:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
else:
stb_subtable_value = None
if watermark is not None:
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
else:
watermark_value = None
# create stb/ctb/tb stream
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="window_close", watermark=watermark_value, subtable_value=stb_subtable_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb)
for i in range(self.tdCom.range_count):
if i == 0:
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
else:
self.tdCom.date_time = window_close_ts + 1
window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])
if watermark_value is not None:
expected_value = i + 1
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
else:
expected_value = i
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
if fill_history_value:
self.tdCom.update_delete_history_data(delete=delete)
if tag_value:
tdSql.query(f'select {tag_value} from {self.stb_name}')
tag_value_list = tdSql.queryResult
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart limit {expected_value};', sorted=True, defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition)
def run(self):
#! TD-25893
# self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=False, fill_history_value=1)
self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
import sys
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def window_close_state_window(self, state_window, delete=True):
tdLog.info(f"*** testing stream window_close+session: state_window: {state_window}, delete: {delete} ***")
self.case_name = sys._getframe().f_code.co_name
self.delete = delete
self.tdCom.prepare_data(state_window=state_window)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
state_window_col_name = self.tdCom.dataDict["state_window"]
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} state_window({state_window_col_name})', trigger_mode="window_close")
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} state_window({state_window_col_name})', trigger_mode="window_close")
state_window_max = self.tdCom.dataDict['state_window_max']
state_window_value_inmem = 0
sleep_step = 0
for i in range(self.tdCom.range_count):
state_window_value = random.randint(int((i)*state_window_max/self.tdCom.range_count), int((i+1)*state_window_max/self.tdCom.range_count))
while state_window_value == state_window_value_inmem:
state_window_value = random.randint(int((i)*state_window_max/self.tdCom.range_count), int((i+1)*state_window_max/self.tdCom.range_count))
if sleep_step < self.tdCom.default_interval:
sleep_step += 1
time.sleep(1)
else:
return
for j in range(2, self.tdCom.range_count+3):
tdSql.execute(f'insert into {self.ctb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
tdSql.execute(f'insert into {self.tb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.tdCom.update and i%2 == 0:
tdSql.execute(f'insert into {self.ctb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
tdSql.execute(f'insert into {self.tb_name} (ts, {state_window_col_name}) values ({self.tdCom.date_time}, {state_window_value})')
if self.delete and i%2 != 0:
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=dt)
self.tdCom.date_time += 1
for tbname in [self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_stream(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} state_window({state_window_col_name}) limit {i}', i)
else:
self.tdCom.check_stream(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} state_window({state_window_col_name}) limit {i}', i)
state_window_value_inmem = state_window_value
def run(self):
for delete in [True, False]:
self.window_close_state_window(state_window="c1", delete=delete)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册