Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8c4f8d30
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8c4f8d30
编写于
7月 04, 2023
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: add tmq test case
上级
2d3ee889
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
307 addition
and
0 deletion
+307
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/system-test/7-tmq/tmqDropConsumer.json
tests/system-test/7-tmq/tmqDropConsumer.json
+28
-0
tests/system-test/7-tmq/tmqDropConsumer.py
tests/system-test/7-tmq/tmqDropConsumer.py
+278
-0
未找到文件。
tests/parallel_test/cases.task
浏览文件 @
8c4f8d30
...
...
@@ -37,6 +37,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropConsumer.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
...
...
tests/system-test/7-tmq/tmqDropConsumer.json
0 → 100644
浏览文件 @
8c4f8d30
{
"filetype"
:
"subscribe"
,
"cfgdir"
:
"/etc/taos"
,
"host"
:
"127.0.0.1"
,
"port"
:
6030
,
"user"
:
"root"
,
"password"
:
"taosdata"
,
"result_file"
:
"tmq_res.txt"
,
"tmq_info"
:
{
"concurrent"
:
2
,
"poll_delay"
:
100000
,
"group.id"
:
""
,
"group_mode"
:
"independent"
,
"create_mode"
:
"parallel"
,
"client.id"
:
"cliid_0001"
,
"auto.offset.reset"
:
"earliest"
,
"enable.manual.commit"
:
"false"
,
"enable.auto.commit"
:
"false"
,
"auto.commit.interval.ms"
:
1000
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
,
"rows_file"
:
""
,
"topic_list"
:
[
{
"name"
:
"dbtstb_0001"
,
"sql"
:
"select * from dbt.stb;"
},
{
"name"
:
"dbtstb_0002"
,
"sql"
:
"select * from dbt.stb;"
}
]
}
}
tests/system-test/7-tmq/tmqDropConsumer.py
0 → 100644
浏览文件 @
8c4f8d30
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
2
self
.
tmqMaxGroups
=
2
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
2
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 360000"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
topicName
,
newGroupId
,
expectResult
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
newGroupId
,
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
ret
=
'success'
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
try
:
consumer
.
subscribe
([
topicName
])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
if
(
expectResult
==
"fail"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
tdLog
.
info
(
"consumer.subscribe() success "
)
if
(
expectResult
==
"success"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100000000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'dbtstb_0001'
,
'dbtstb_0002'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query('show topics;')
# topicNum = tdSql.queryRows
# tdLog.info(" topic count: %d"%(topicNum))
# if topicNum != len(topicNameList):
# tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList)))
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
# use taosBenchmark to subscribe
binPath
=
self
.
getPath
()
cmd
=
"nohup %s -f ./7-tmq/tmqDropConsumer.json > /dev/null 2>&1 & "
%
binPath
tdLog
.
info
(
"%s"
%
(
cmd
))
os
.
system
(
cmd
)
expectTopicNum
=
len
(
topicNameList
)
consumerThreadNum
=
2
expectConsumerNUm
=
expectTopicNum
*
consumerThreadNum
expectSubscribeNum
=
self
.
vgroups
*
expectTopicNum
*
consumerThreadNum
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get topic count: %d"
%
(
topicNum
))
if
topicNum
!=
expectTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
expectTopicNum
))
flag
=
0
while
(
1
):
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
tdLog
.
info
(
" get consumers count: %d"
%
(
consumerNUm
))
if
consumerNUm
==
expectConsumerNUm
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show consumers %d not equal expect num: %d"
%
(
topicNum
,
expectConsumerNUm
))
flag
=
0
for
i
in
range
(
10
):
tdSql
.
query
(
'show subscriptions;'
)
subscribeNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get subscriptions count: %d"
%
(
subscribeNum
))
if
subscribeNum
==
expectSubscribeNum
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show subscriptions %d not equal expect num: %d"
%
(
subscribeNum
,
expectSubscribeNum
))
# get all consumer group id
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
groupIdList
=
[]
for
i
in
range
(
consumerNUm
):
groupId
=
tdSql
.
getData
(
i
,
1
)
existFlag
=
0
for
j
in
range
(
len
(
groupIdList
)):
if
(
groupId
==
groupIdList
[
j
]):
existFlag
=
1
break
if
(
0
==
existFlag
):
groupIdList
.
append
(
groupId
)
# kill taosBenchmark
os
.
system
(
"pkill -9 taosBenchmark"
)
# wait the status to "lost"
while
(
1
):
exitFlag
=
1
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
for
i
in
range
(
consumerNUm
):
status
=
tdSql
.
getData
(
i
,
3
)
if
(
status
!=
"lost"
):
exitFlag
=
0
time
.
sleep
(
2
)
break
if
(
1
==
exitFlag
):
break
# drop consumer groups
for
i
in
range
(
len
(
groupIdList
)):
for
j
in
range
(
len
(
topicNameList
)):
sqlCmd
=
f
"drop consumer group `%s` on %s"
%
(
groupIdList
[
i
],
topicNameList
[
j
])
tdSql
.
execute
(
sqlCmd
)
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
debug
(
"notify sub-thread to stop insert data"
)
pThread
.
join
()
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录