Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dfbd7ea9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
dfbd7ea9
编写于
6月 29, 2023
作者:
H
Hui Li
提交者:
GitHub
6月 29, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21900 from taosdata/test_main/lihui
add tmq test case for max topics
上级
bb263b10
474efc4c
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
263 addition
and
1 deletion
+263
-1
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-1
tests/system-test/7-tmq/tmqMaxTopic.py
tests/system-test/7-tmq/tmqMaxTopic.py
+262
-0
未找到文件。
tests/parallel_test/cases.task
浏览文件 @
dfbd7ea9
...
...
@@ -33,10 +33,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py
...
...
tests/system-test/7-tmq/tmqMaxTopic.py
0 → 100644
浏览文件 @
dfbd7ea9
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
20
self
.
tmqMaxGroups
=
100
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
modifyMaxTopics
(
self
,
tmqMaxTopicNum
):
# single dnode
cfgDir
=
tdDnodes
.
dnodes
[
0
].
cfgDir
# cluster dnodes
# tdDnodes[1].dataDir
# tdDnodes[1].logDir
# tdDnodes[1].cfgDir
cfgFile
=
f
"%s/taos.cfg"
%
(
cfgDir
)
shellCmd
=
'echo "tmqMaxTopicNum %d" >> %s'
%
(
tmqMaxTopicNum
,
cfgFile
)
tdLog
.
info
(
" shell cmd: %s"
%
(
shellCmd
))
os
.
system
(
shellCmd
)
tdDnodes
.
stoptaosd
(
1
)
tdDnodes
.
starttaosd
(
1
)
time
.
sleep
(
5
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 3600"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
**
inputDict
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
inputDict
[
'group_id_prefix'
],
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
for
j
in
range
(
self
.
tmqMaxGroups
):
consumer_dict
[
"group.id"
]
=
f
"%s_%d"
%
(
inputDict
[
'group_id_prefix'
],
j
)
consumer_dict
[
"client.id"
]
=
f
"%s_%d"
%
(
inputDict
[
'group_id_prefix'
],
j
)
print
(
"======grpid: %s"
%
(
consumer_dict
[
"group.id"
]))
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
consumer
.
subscribe
([
inputDict
[
'topic_name'
]])
# res = consumer.poll(inputDict['pollDelay'])
return
def
asyncSubscribe
(
self
,
inputDict
):
pThread
=
threading
.
Thread
(
target
=
self
.
tmqSubscribe
,
kwargs
=
inputDict
)
pThread
.
start
()
return
pThread
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
topicNamePrefix
=
'topicname_'
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
self
.
tmqMaxTopicNum
):
sqlString
=
"create topic %s%d as %s"
%
(
topicNamePrefix
,
i
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"create topic %s%s as %s"
%
(
topicNamePrefix
,
'xyz'
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
error
(
sqlString
)
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" topic count: %d"
%
(
topicNum
))
if
topicNum
!=
self
.
tmqMaxTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
self
.
tmqMaxTopicNum
))
# self.updatecfgDict = {'tmqMaxTopicNum': 22}
# tdDnodes.stoptaosd(1)
# tdDnodes.deploy(1, self.updatecfgDict)
# tdDnodes.starttaosd(1)
# time.sleep(5)
newTmqMaxTopicNum
=
22
self
.
modifyMaxTopics
(
newTmqMaxTopicNum
)
sqlString
=
"create topic %s%s as %s"
%
(
topicNamePrefix
,
'x'
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"create topic %s%s as %s"
%
(
topicNamePrefix
,
'y'
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"create topic %s%s as %s"
%
(
topicNamePrefix
,
'xyz'
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
error
(
sqlString
)
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" topic count: %d"
%
(
topicNum
))
if
topicNum
!=
newTmqMaxTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
newTmqMaxTopicNum
))
newTmqMaxTopicNum
=
18
self
.
modifyMaxTopics
(
newTmqMaxTopicNum
)
i
=
0
sqlString
=
"drop topic %s%d"
%
(
topicNamePrefix
,
i
)
tdLog
.
info
(
"drop topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
i
=
1
sqlString
=
"drop topic %s%d"
%
(
topicNamePrefix
,
i
)
tdLog
.
info
(
"drop topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"drop topic %s%s"
%
(
topicNamePrefix
,
"x"
)
tdLog
.
info
(
"drop topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"drop topic %s%s"
%
(
topicNamePrefix
,
"y"
)
tdLog
.
info
(
"drop topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
sqlString
=
"create topic %s%s as %s"
%
(
topicNamePrefix
,
'xyz'
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
error
(
sqlString
)
# pThreadList = []
# for i in range(self.tmqMaxTopicNum):
# topic_name = f"%s%d" %(topicNamePrefix, i)
# print("======%s"%(topic_name))
# group_id_prefix = f"grp_%d"%(i)
# inputDict = {'group_id_prefix': group_id_prefix,
# 'topic_name': topic_name,
# 'pollDelay': 1
# }
# pThread = self.asyncSubscribe(inputDict)
# pThreadList.append(pThread)
# for j in range(self.tmqMaxGroups):
# pThreadList[j].join()
# time.sleep(5)
# tdSql.query('show subscriptions;')
# subscribeNum = tdSql.queryRows
# expectNum = self.tmqMaxGroups * self.tmqMaxTopicNum
# tdLog.info("loop index: %d, ======subscriptions %d and expect num: %d"%(i, subscribeNum, expectNum))
# if subscribeNum != expectNum:
# tdLog.exit("subscriptions %d not equal expect num: %d"%(subscribeNum, expectNum))
# # drop all topics
# for i in range(self.tmqMaxTopicNum):
# sqlString = "drop topic %s%d" %(topicNamePrefix, i)
# tdLog.info("drop topic sql: %s"%sqlString)
# tdSql.execute(sqlString)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录