Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a8c406ff
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a8c406ff
编写于
7月 22, 2022
作者:
H
Hui Li
提交者:
GitHub
7月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15259 from taosdata/test3.0/lihui
test: add test cases for tmq
上级
eb5dbffc
61d4f76d
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
273 addition
and
1 deletion
+273
-1
tests/pytest/util/common.py
tests/pytest/util/common.py
+1
-0
tests/script/test.sh
tests/script/test.sh
+1
-0
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+34
-1
tests/system-test/7-tmq/tmqDropNtb.py
tests/system-test/7-tmq/tmqDropNtb.py
+237
-0
未找到文件。
tests/pytest/util/common.py
浏览文件 @
a8c406ff
...
...
@@ -28,6 +28,7 @@ from util.common import *
from
util.constant
import
*
from
dataclasses
import
dataclass
,
field
from
typing
import
List
from
datetime
import
datetime
@
dataclass
class
DataSet
:
...
...
tests/script/test.sh
浏览文件 @
a8c406ff
...
...
@@ -84,6 +84,7 @@ echo "SIM_DIR : $SIM_DIR"
echo
"CODE_DIR :
$CODE_DIR
"
echo
"CFG_DIR :
$CFG_DIR
"
rm
-rf
$SIM_DIR
/
*
rm
-rf
$LOG_DIR
rm
-rf
$CFG_DIR
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
a8c406ff
...
...
@@ -20,6 +20,8 @@ import threading
import
requests
import
time
# import socketfrom
import
json
import
toml
import
taos
from
util.log
import
*
...
...
@@ -207,7 +209,7 @@ class TMQCom:
def
drop_ctable
(
self
,
tsql
,
dbname
=
None
,
count
=
1
,
default_ctbname_prefix
=
"ctb"
,
ctbStartIdx
=
0
):
for
_
in
range
(
count
):
create_ctable_sql
=
f
'drop table
{
dbname
}
.
{
default_ctbname_prefix
}{
ctbStartIdx
}
;'
create_ctable_sql
=
f
'drop table
if exists
{
dbname
}
.
{
default_ctbname_prefix
}{
ctbStartIdx
}
;'
ctbStartIdx
+=
1
tdLog
.
info
(
"drop ctb sql: %s"
%
create_ctable_sql
)
tsql
.
execute
(
create_ctable_sql
)
...
...
@@ -503,6 +505,37 @@ class TMQCom:
break
return
def
create_ntable
(
self
,
tsql
,
dbname
=
None
,
tbname_prefix
=
"ntb"
,
tbname_index_start_num
=
1
,
column_elm_list
=
None
,
colPrefix
=
'c'
,
tblNum
=
1
,
**
kwargs
):
tb_params
=
""
if
len
(
kwargs
)
>
0
:
for
param
,
value
in
kwargs
.
items
():
tb_params
+=
f
'
{
param
}
"
{
value
}
" '
column_type_str
=
tdCom
.
gen_column_type_str
(
colPrefix
,
column_elm_list
)
for
_
in
range
(
tblNum
):
create_table_sql
=
f
'create table
{
dbname
}
.
{
tbname_prefix
}{
tbname_index_start_num
}
(
{
column_type_str
}
)
{
tb_params
}
;'
tbname_index_start_num
+=
1
tsql
.
execute
(
create_table_sql
)
def
insert_rows_into_ntbl
(
self
,
tsql
,
dbname
=
None
,
tbname_prefix
=
"ntb"
,
tbname_index_start_num
=
1
,
column_ele_list
=
None
,
startTs
=
None
,
tblNum
=
1
,
rows
=
1
):
if
startTs
is
None
:
startTs
=
tdCom
.
genTs
()[
0
]
for
tblIdx
in
range
(
tblNum
):
for
rowIdx
in
range
(
rows
):
column_value_list
=
tdCom
.
gen_column_value_list
(
column_ele_list
,
f
'
{
startTs
}
+
{
rowIdx
}
s'
)
column_value_str
=
''
idx
=
0
for
column_value
in
column_value_list
:
if
isinstance
(
column_value
,
str
)
and
idx
!=
0
:
column_value_str
+=
f
'"
{
column_value
}
", '
else
:
column_value_str
+=
f
'
{
column_value
}
, '
idx
+=
1
column_value_str
=
column_value_str
.
rstrip
()[:
-
1
]
insert_sql
=
f
'insert into
{
dbname
}
.
{
tbname_prefix
}{
tblIdx
+
tbname_index_start_num
}
values (
{
column_value_str
}
);'
tsql
.
execute
(
insert_sql
)
def
close
(
self
):
self
.
cursor
.
close
()
...
...
tests/system-test/7-tmq/tmqDropNtb.py
0 → 100644
浏览文件 @
a8c406ff
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
10
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
waitSubscriptionExit
(
self
,
max_wait_count
=
20
):
wait_cnt
=
0
while
(
wait_cnt
<
max_wait_count
):
tdSql
.
query
(
"show subscriptions"
)
if
tdSql
.
getRows
()
==
0
:
break
else
:
time
.
sleep
(
1
)
wait_cnt
+=
1
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
# drop some ntbs
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
0
elif
self
.
snapshot
==
1
:
consumerId
=
1
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
])
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
*
3
/
4
)
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
self
.
waitSubscriptionExit
()
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
# drop some ntbs and create some new ntbs
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
2
elif
self
.
snapshot
==
1
:
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start create some new normal tables...."
)
paraDict
[
"ctbPrefix"
]
=
'newCtb'
paraDict
[
"ctbNum"
]
=
self
.
ctbNum
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into these new normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
/
2
*
(
1
+
3
/
4
))
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
self
.
waitSubscriptionExit
()
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
# self.tmqCase1()
self
.
tmqCase2
()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
# self.tmqCase1()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录