Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
faa421ed
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
faa421ed
编写于
6月 25, 2023
作者:
J
jiajingbin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: save
上级
7eff0410
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
151 addition
and
72 deletion
+151
-72
tests/system-test/7-tmq/tmqParamsTest.py
tests/system-test/7-tmq/tmqParamsTest.py
+151
-72
未找到文件。
tests/system-test/7-tmq/tmqParamsTest.py
浏览文件 @
faa421ed
...
...
@@ -16,18 +16,23 @@ sys.path.append("./7-tmq")
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
self
.
wal_retention_period1
=
3600
self
.
wal_retention_period2
=
1
self
.
commit_value_list
=
[
"true"
,
"false"
]
self
.
commit_value_list
=
[
"true"
]
self
.
offset_value_list
=
[
""
,
"earliest"
,
"latest"
,
"none"
]
self
.
offset_value_list
=
[
""
,
"earliest"
,
"none"
]
self
.
tbname_value_list
=
[
"true"
,
"false"
]
self
.
snapshot_value_list
=
[
"true"
,
"false"
]
self
.
commit_value_list
=
[
"true"
]
self
.
offset_value_list
=
[
"latest"
]
self
.
tbname_value_list
=
[
"true"
]
# self.commit_value_list = ["true"]
# self.offset_value_list = ["latest"]
# self.offset_value_list = ["earliest"]
# self.tbname_value_list = ["true"]
def
tmqParamsTest
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
...
...
@@ -42,76 +47,150 @@ class TDTestCase:
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'auto_commit_interval'
:
"
50
00"
}
'auto_commit_interval'
:
"
1
00"
}
topic_name
=
'topic1'
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdSql
.
execute
(
"alter database %s wal_retention_period 3600"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topic_name
,
queryString
)
start_group_id
=
1
for
commit_value
in
self
.
commit_value_list
:
for
offset_value
in
self
.
offset_value_list
:
for
tbname_value
in
self
.
tbname_value_list
:
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expected_res
=
tdSql
.
queryRows
group_id
=
"csm_"
+
str
(
start_group_id
)
consumer_dict
=
{
"group.id"
:
group_id
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
paraDict
[
"auto_commit_interval"
],
"enable.auto.commit"
:
commit_value
,
"auto.offset.reset"
:
offset_value
,
"msg.with.table.name"
:
tbname_value
}
print
(
consumer_dict
)
consumer_commit
=
1
if
consumer_dict
[
"enable.auto.commit"
]
==
"true"
else
0
consumer_tbname
=
1
if
consumer_dict
[
"msg.with.table.name"
]
==
"true"
else
0
consumer_ret
=
"earliest"
if
offset_value
==
""
else
offset_value
expected_parameters
=
f
'tbname:
{
consumer_tbname
}
,commit:
{
consumer_commit
}
,interval:
{
paraDict
[
"auto_commit_interval"
]
}
,reset:
{
consumer_ret
}
'
if
len
(
offset_value
)
==
0
:
del
consumer_dict
[
"auto.offset.reset"
]
consumer
=
Consumer
(
consumer_dict
)
consumer
.
subscribe
([
topic_name
])
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
int
(
round
(
time
.
time
()
*
1000
)))
try
:
while
True
:
res
=
consumer
.
poll
(
1
)
tdSql
.
query
(
'show consumers;'
)
consumer_info
=
tdSql
.
queryResult
[
0
][
-
1
]
if
not
res
:
break
err
=
res
.
error
()
if
err
is
not
None
:
raise
err
# val = res.value()
# for block in val:
# print(block.fetchall())
finally
:
consumer
.
unsubscribe
()
consumer
.
close
()
tdSql
.
checkEqual
(
consumer_info
,
expected_parameters
)
start_group_id
+=
1
tdSql
.
query
(
'show subscriptions;'
)
subscription_info
=
tdSql
.
queryResult
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"log:"
,
""
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>
0
,
True
)
print
(
offset_value_list
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
print
(
rows_value_list
)
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
tdSql
.
execute
(
f
"drop topic if exists
{
topic_name
}
"
)
for
snapshot_value
in
self
.
snapshot_value_list
:
for
commit_value
in
self
.
commit_value_list
:
for
offset_value
in
self
.
offset_value_list
:
for
tbname_value
in
self
.
tbname_value_list
:
topic_name
=
'topic1'
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topic_name
,
queryString
)
print
(
"----"
,
snapshot_value
)
tdSql
.
query
(
f
'select * from information_schema.ins_databases'
)
db_wal_retention_period_list
=
list
(
map
(
lambda
x
:
x
[
-
8
]
if
x
[
0
]
==
paraDict
[
'dbName'
]
else
None
,
tdSql
.
queryResult
))
print
(
"---db_wal_retention_period_list"
,
db_wal_retention_period_list
)
for
i
in
range
(
len
(
db_wal_retention_period_list
)):
if
db_wal_retention_period_list
[
0
]
is
None
or
db_wal_retention_period_list
[
-
1
]
is
None
:
db_wal_retention_period_list
.
remove
(
None
)
if
snapshot_value
==
"true"
:
if
db_wal_retention_period_list
[
0
]
!=
self
.
wal_retention_period2
:
tdSql
.
execute
(
f
"alter database
{
paraDict
[
'dbName'
]
}
wal_retention_period
{
self
.
wal_retention_period2
}
"
)
time
.
sleep
(
self
.
wal_retention_period2
+
1
)
tdSql
.
execute
(
f
'flush database
{
paraDict
[
"dbName"
]
}
'
)
else
:
print
(
"iinininini"
)
if
db_wal_retention_period_list
[
0
]
!=
self
.
wal_retention_period1
:
tdSql
.
execute
(
f
"alter database
{
paraDict
[
'dbName'
]
}
wal_retention_period
{
self
.
wal_retention_period1
}
"
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expected_res
=
tdSql
.
queryRows
group_id
=
"csm_"
+
str
(
start_group_id
)
consumer_dict
=
{
"group.id"
:
group_id
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
paraDict
[
"auto_commit_interval"
],
"enable.auto.commit"
:
commit_value
,
"auto.offset.reset"
:
offset_value
,
"experimental.snapshot.enable"
:
snapshot_value
,
"msg.with.table.name"
:
tbname_value
}
print
(
consumer_dict
)
consumer_commit
=
1
if
consumer_dict
[
"enable.auto.commit"
]
==
"true"
else
0
consumer_tbname
=
1
if
consumer_dict
[
"msg.with.table.name"
]
==
"true"
else
0
# consumer_snapshot = 1 if consumer_dict["experimental.snapshot.enable"] == "true" else 0
consumer_ret
=
"earliest"
if
offset_value
==
""
else
offset_value
expected_parameters
=
f
'tbname:
{
consumer_tbname
}
,commit:
{
consumer_commit
}
,interval:
{
paraDict
[
"auto_commit_interval"
]
}
,reset:
{
consumer_ret
}
'
if
len
(
offset_value
)
==
0
:
del
consumer_dict
[
"auto.offset.reset"
]
consumer
=
Consumer
(
consumer_dict
)
consumer
.
subscribe
([
topic_name
])
stop_flag
=
0
# try:
# while True:
# res = consumer.poll(1)
# tdSql.query('show consumers;')
# consumer_info = tdSql.queryResult[0][-1]
# if not res:
# break
# # err = res.error()
# # if err is not None:
# # raise err
# # val = res.value()
# # for block in val:
# # print(block.fetchall())
try
:
while
True
:
res
=
consumer
.
poll
(
1
)
tdSql
.
query
(
'show consumers;'
)
consumer_info
=
tdSql
.
queryResult
[
0
][
-
1
]
if
offset_value
==
"latest"
:
if
not
res
and
stop_flag
==
1
:
break
else
:
if
not
res
:
break
# err = res.error()
# if err is not None:
# raise err
# val = res.value()
# for block in val:
# print(block.fetchall())
if
offset_value
==
"latest"
and
stop_flag
==
0
:
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
int
(
round
(
time
.
time
()
*
1000
)))
stop_flag
=
1
finally
:
consumer
.
unsubscribe
()
consumer
.
close
()
tdSql
.
checkEqual
(
consumer_info
,
expected_parameters
)
start_group_id
+=
1
tdSql
.
query
(
'show subscriptions;'
)
subscription_info
=
tdSql
.
queryResult
print
(
subscription_info
)
if
snapshot_value
==
"true"
:
if
offset_value
!=
"earliest"
:
pass
else
:
if
offset_value
!=
"none"
:
offset_value_str
=
","
.
join
(
list
(
map
(
lambda
x
:
x
[
-
2
],
subscription_info
)))
print
(
offset_value_str
)
tdSql
.
checkEqual
(
"snapshot"
in
offset_value_str
,
True
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
else
:
offset_value_list
=
list
(
map
(
lambda
x
:
x
[
-
2
],
subscription_info
))
tdSql
.
checkEqual
(
offset_value_list
,
[
None
]
*
len
(
subscription_info
))
print
(
offset_value_list
)
rows_value_list
=
list
(
map
(
lambda
x
:
x
[
-
1
],
subscription_info
))
tdSql
.
checkEqual
(
rows_value_list
,
[
None
]
*
len
(
subscription_info
))
else
:
print
(
"====offset_value----"
,
offset_value
)
if
offset_value
!=
"none"
:
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"log:"
,
""
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>
0
,
True
)
print
(
offset_value_list
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
else
:
offset_value_list
=
list
(
map
(
lambda
x
:
x
[
-
2
],
subscription_info
))
tdSql
.
checkEqual
(
offset_value_list
,
[
None
]
*
len
(
subscription_info
))
print
(
offset_value_list
)
rows_value_list
=
list
(
map
(
lambda
x
:
x
[
-
1
],
subscription_info
))
tdSql
.
checkEqual
(
rows_value_list
,
[
None
]
*
len
(
subscription_info
))
# tdSql.checkEqual(sum(rows_value_list), expected_res)
# if offset_value == "latest":
# tdSql.checkEqual(sum(rows_value_list), expected_res)
# else:
# tdSql.checkEqual(sum(rows_value_list), expected_res)
tdSql
.
execute
(
f
"drop topic if exists
{
topic_name
}
"
)
tdSql
.
execute
(
f
'drop database if exists
{
paraDict
[
"dbName"
]
}
'
)
def
run
(
self
):
self
.
tmqParamsTest
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录