Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
27159fbf
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
27159fbf
编写于
7月 05, 2023
作者:
H
Hui Li
提交者:
GitHub
7月 05, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21905 from taosdata/test_main/lihui
test: add tmq case for max groupid
上级
53263bf9
94089fab
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
854 addition
and
1 deletion
+854
-1
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+2
-0
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+21
-1
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
+248
-0
tests/system-test/7-tmq/tmqDropConsumer.json
tests/system-test/7-tmq/tmqDropConsumer.json
+28
-0
tests/system-test/7-tmq/tmqDropConsumer.py
tests/system-test/7-tmq/tmqDropConsumer.py
+282
-0
tests/system-test/7-tmq/tmqMaxGroupIds.json
tests/system-test/7-tmq/tmqMaxGroupIds.json
+27
-0
tests/system-test/7-tmq/tmqMaxGroupIds.py
tests/system-test/7-tmq/tmqMaxGroupIds.py
+246
-0
未找到文件。
tests/parallel_test/cases.task
浏览文件 @
27159fbf
...
...
@@ -36,6 +36,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
27159fbf
...
...
@@ -37,6 +37,9 @@ from util.common import *
# INSERT_DATA = 3
class
TMQCom
:
def
__init__
(
self
):
self
.
g_end_insert_flag
=
0
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdSql
.
init
(
conn
.
cursor
())
...
...
@@ -330,8 +333,11 @@ class TMQCom:
ctbDict
[
i
]
=
0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb
=
0
rowsOfCtb
=
0
while
rowsOfCtb
<
rowsPerTbl
:
if
(
0
!=
self
.
g_end_insert_flag
):
tdLog
.
debug
(
"get signal to stop insert data"
)
break
for
i
in
range
(
ctbNum
):
sql
+=
" %s.%s%d values "
%
(
dbName
,
ctbPrefix
,
i
+
ctbStartIdx
)
rowsBatched
=
0
...
...
@@ -571,6 +577,20 @@ class TMQCom:
tdLog
.
info
(
tsql
.
queryResult
)
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
def
killProcesser
(
self
,
processerName
):
killCmd
=
(
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1"
%
processerName
)
psCmd
=
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}'"
%
processerName
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
while
processID
:
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
def
close
(
self
):
self
.
cursor
.
close
()
...
...
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
0 → 100644
浏览文件 @
27159fbf
import
sys
import
time
import
datetime
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
100
self
.
tmqMaxTopicNum
=
1
self
.
tmqMaxGroups
=
1
self
.
walRetentionPeriod
=
3
self
.
actConsumeTotalRows
=
0
self
.
retryPoll
=
0
self
.
lock
=
threading
.
Lock
()
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
1
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period %d"
%
(
paraDict
[
'dbName'
],
self
.
walRetentionPeriod
))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tdLog.info("insert data")
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def
tmqSubscribe
(
self
,
**
inputDict
):
consumer_dict
=
{
"group.id"
:
inputDict
[
'group_id'
],
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"100"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
consumer
=
Consumer
(
consumer_dict
)
consumer
.
subscribe
([
inputDict
[
'topic_name'
]])
onceFlag
=
0
try
:
while
True
:
if
(
1
==
self
.
retryPoll
):
time
.
sleep
(
2
)
continue
res
=
consumer
.
poll
(
inputDict
[
'pollDelay'
])
if
not
res
:
break
err
=
res
.
error
()
if
err
is
not
None
:
raise
err
val
=
res
.
value
()
for
block
in
val
:
# print(block.fetchall())
data
=
block
.
fetchall
()
for
row
in
data
:
# print("===================================")
# print(row)
self
.
actConsumeTotalRows
+=
1
if
(
0
==
onceFlag
):
onceFlag
=
1
with
self
.
lock
:
self
.
retryPoll
=
1
currentTime
=
datetime
.
now
()
print
(
"%s temp stop consume"
%
(
str
(
currentTime
)))
currentTime
=
datetime
.
now
()
print
(
"%s already consume rows: %d, and sleep for a while"
%
(
str
(
currentTime
),
self
.
actConsumeTotalRows
))
# time.sleep(self.walRetentionPeriod * 3)
finally
:
consumer
.
unsubscribe
()
consumer
.
close
()
return
def
asyncSubscribe
(
self
,
inputDict
):
pThread
=
threading
.
Thread
(
target
=
self
.
tmqSubscribe
,
kwargs
=
inputDict
)
pThread
.
start
()
return
pThread
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100
,
'batchNum'
:
1
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# create topic
topicNameList
=
[
'dbtstb_0001'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# start consumer
inputDict
=
{
'group_id'
:
"grpid_0001"
,
'topic_name'
:
topicNameList
[
0
],
'pollDelay'
:
10
}
pThread2
=
self
.
asyncSubscribe
(
inputDict
)
pThread1
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
pThread1
.
join
()
tdLog
.
info
(
"firstly call to flash database"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
time
.
sleep
(
self
.
walRetentionPeriod
+
1
)
tdLog
.
info
(
"secondely call to flash database"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
# wait the consumer to complete one poll
while
(
0
==
self
.
retryPoll
):
time
.
sleep
(
1
)
continue
with
self
.
lock
:
self
.
retryPoll
=
0
currentTime
=
datetime
.
now
()
print
(
"%s restart consume"
%
(
str
(
currentTime
)))
paraDict
[
"startTs"
]
=
1640966400000
+
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
pThread3
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
debug
(
"wait sub-thread to end insert data"
)
pThread3
.
join
()
totalInsertRows
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
*
2
tdLog
.
debug
(
"wait sub-thread to end consume data"
)
pThread2
.
join
()
tdLog
.
info
(
"act consume total rows: %d, act insert total rows: %d"
%
(
self
.
actConsumeTotalRows
,
totalInsertRows
))
if
(
self
.
actConsumeTotalRows
>=
totalInsertRows
):
tdLog
.
exit
(
"act consume rows: %d not equal expect: %d"
%
(
self
.
actConsumeTotalRows
,
totalInsertRows
))
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqDropConsumer.json
0 → 100644
浏览文件 @
27159fbf
{
"filetype"
:
"subscribe"
,
"cfgdir"
:
"/etc/taos"
,
"host"
:
"127.0.0.1"
,
"port"
:
6030
,
"user"
:
"root"
,
"password"
:
"taosdata"
,
"result_file"
:
"tmq_res.txt"
,
"tmq_info"
:
{
"concurrent"
:
2
,
"poll_delay"
:
100000
,
"group.id"
:
""
,
"group_mode"
:
"independent"
,
"create_mode"
:
"parallel"
,
"client.id"
:
"cliid_0001"
,
"auto.offset.reset"
:
"earliest"
,
"enable.manual.commit"
:
"false"
,
"enable.auto.commit"
:
"false"
,
"auto.commit.interval.ms"
:
1000
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
,
"rows_file"
:
""
,
"topic_list"
:
[
{
"name"
:
"dbtstb_0001"
,
"sql"
:
"select * from dbt.stb;"
},
{
"name"
:
"dbtstb_0002"
,
"sql"
:
"select * from dbt.stb;"
}
]
}
}
tests/system-test/7-tmq/tmqDropConsumer.py
0 → 100644
浏览文件 @
27159fbf
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
2
self
.
tmqMaxGroups
=
2
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
2
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 360000"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
topicName
,
newGroupId
,
expectResult
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
newGroupId
,
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
ret
=
'success'
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
try
:
consumer
.
subscribe
([
topicName
])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
if
(
expectResult
==
"fail"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
tdLog
.
info
(
"consumer.subscribe() success "
)
if
(
expectResult
==
"success"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100000000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'dbtstb_0001'
,
'dbtstb_0002'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query('show topics;')
# topicNum = tdSql.queryRows
# tdLog.info(" topic count: %d"%(topicNum))
# if topicNum != len(topicNameList):
# tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList)))
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
# use taosBenchmark to subscribe
binPath
=
self
.
getPath
()
cmd
=
"nohup %s -f ./7-tmq/tmqDropConsumer.json > /dev/null 2>&1 & "
%
binPath
tdLog
.
info
(
"%s"
%
(
cmd
))
os
.
system
(
cmd
)
expectTopicNum
=
len
(
topicNameList
)
consumerThreadNum
=
2
expectConsumerNUm
=
expectTopicNum
*
consumerThreadNum
expectSubscribeNum
=
self
.
vgroups
*
expectTopicNum
*
consumerThreadNum
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get topic count: %d"
%
(
topicNum
))
if
topicNum
!=
expectTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
expectTopicNum
))
flag
=
0
while
(
1
):
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
tdLog
.
info
(
" get consumers count: %d"
%
(
consumerNUm
))
if
consumerNUm
==
expectConsumerNUm
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show consumers %d not equal expect num: %d"
%
(
topicNum
,
expectConsumerNUm
))
flag
=
0
for
i
in
range
(
10
):
tdSql
.
query
(
'show subscriptions;'
)
subscribeNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get subscriptions count: %d"
%
(
subscribeNum
))
if
subscribeNum
==
expectSubscribeNum
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show subscriptions %d not equal expect num: %d"
%
(
subscribeNum
,
expectSubscribeNum
))
# get all consumer group id
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
groupIdList
=
[]
for
i
in
range
(
consumerNUm
):
groupId
=
tdSql
.
getData
(
i
,
1
)
existFlag
=
0
for
j
in
range
(
len
(
groupIdList
)):
if
(
groupId
==
groupIdList
[
j
]):
existFlag
=
1
break
if
(
0
==
existFlag
):
groupIdList
.
append
(
groupId
)
# kill taosBenchmark
tmqCom
.
killProcesser
(
"taosBenchmark"
)
tdLog
.
info
(
"kill taosBenchmak end"
)
# wait the status to "lost"
while
(
1
):
exitFlag
=
1
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
for
i
in
range
(
consumerNUm
):
status
=
tdSql
.
getData
(
i
,
3
)
if
(
status
!=
"lost"
):
exitFlag
=
0
time
.
sleep
(
2
)
break
if
(
1
==
exitFlag
):
break
tdLog
.
info
(
"all consumers status into 'lost'"
)
# drop consumer groups
for
i
in
range
(
len
(
groupIdList
)):
for
j
in
range
(
len
(
topicNameList
)):
sqlCmd
=
f
"drop consumer group `%s` on %s"
%
(
groupIdList
[
i
],
topicNameList
[
j
])
tdLog
.
info
(
"drop consumer cmd: %s"
%
(
sqlCmd
))
tdSql
.
execute
(
sqlCmd
)
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
debug
(
"notify sub-thread to stop insert data"
)
pThread
.
join
()
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqMaxGroupIds.json
0 → 100644
浏览文件 @
27159fbf
{
"filetype"
:
"subscribe"
,
"cfgdir"
:
"/etc/taos"
,
"host"
:
"127.0.0.1"
,
"port"
:
6030
,
"user"
:
"root"
,
"password"
:
"taosdata"
,
"result_file"
:
"tmq_res.txt"
,
"tmq_info"
:
{
"concurrent"
:
99
,
"poll_delay"
:
100000
,
"group.id"
:
""
,
"group_mode"
:
"independent"
,
"create_mode"
:
"parallel"
,
"client.id"
:
"cliid_0001"
,
"auto.offset.reset"
:
"earliest"
,
"enable.manual.commit"
:
"false"
,
"enable.auto.commit"
:
"false"
,
"auto.commit.interval.ms"
:
1000
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
,
"rows_file"
:
""
,
"topic_list"
:
[
{
"name"
:
"dbtstb_0001"
,
"sql"
:
"select * from dbt.stb;"
}
]
}
}
tests/system-test/7-tmq/tmqMaxGroupIds.py
0 → 100644
浏览文件 @
27159fbf
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
20
self
.
tmqMaxGroups
=
100
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 360000"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
topicName
,
newGroupId
,
expectResult
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
newGroupId
,
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
ret
=
'success'
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
try
:
consumer
.
subscribe
([
topicName
])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
if
(
expectResult
==
"fail"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
tdLog
.
info
(
"consumer.subscribe() success "
)
if
(
expectResult
==
"success"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100000000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'dbtstb_0001'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query('show topics;')
# topicNum = tdSql.queryRows
# tdLog.info(" topic count: %d"%(topicNum))
# if topicNum != len(topicNameList):
# tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList)))
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
# use taosBenchmark to subscribe
binPath
=
self
.
getPath
()
cmd
=
"nohup %s -f ./7-tmq/tmqMaxGroupIds.json > /dev/null 2>&1 & "
%
binPath
tdLog
.
info
(
"%s"
%
(
cmd
))
os
.
system
(
cmd
)
expectTopicNum
=
1
expectConsumerNUm
=
99
expectSubscribeNum
=
99
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get topic count: %d"
%
(
topicNum
))
if
topicNum
!=
expectTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
expectTopicNum
))
flag
=
0
while
(
1
):
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
tdLog
.
info
(
" get consumers count: %d"
%
(
consumerNUm
))
if
consumerNUm
==
expectConsumerNUm
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tdLog
.
exit
(
"show consumers %d not equal expect num: %d"
%
(
topicNum
,
expectConsumerNUm
))
flag
=
0
for
i
in
range
(
10
):
tdSql
.
query
(
'show subscriptions;'
)
subscribeNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get subscriptions count: %d"
%
(
subscribeNum
))
if
subscribeNum
==
expectSubscribeNum
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tdLog
.
exit
(
"show subscriptions %d not equal expect num: %d"
%
(
subscribeNum
,
expectSubscribeNum
))
res
=
self
.
tmqSubscribe
(
topicNameList
[
0
],
"newGroupId_001"
,
"success"
)
if
res
!=
'success'
:
tdLog
.
exit
(
"limit max groupid fail"
)
res
=
self
.
tmqSubscribe
(
topicNameList
[
0
],
"newGroupId_002"
,
"fail"
)
if
res
!=
'success'
:
tdLog
.
exit
(
"limit max groupid fail"
)
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
debug
(
"notify sub-thread to stop insert data"
)
pThread
.
join
()
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录