Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
721a7e64
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
721a7e64
编写于
7月 12, 2022
作者:
H
Hui Li
提交者:
GitHub
7月 12, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14821 from taosdata/test3.0/lihui
Test3.0/lihui
上级
12f9027b
3801d746
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
304 addition
and
146 deletion
+304
-146
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+49
-144
tests/system-test/7-tmq/tmqDnodeRestart.py
tests/system-test/7-tmq/tmqDnodeRestart.py
+253
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+2
-2
未找到文件。
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
721a7e64
...
@@ -16,9 +16,9 @@ from tmqCommon import *
...
@@ -16,9 +16,9 @@ from tmqCommon import *
class
TDTestCase
:
class
TDTestCase
:
def
__init__
(
self
):
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
ctbNum
=
100
0
self
.
rowsPerTbl
=
1000
0
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
...
@@ -29,7 +29,7 @@ class TDTestCase:
...
@@ -29,7 +29,7 @@ class TDTestCase:
paraDict
=
{
'dbName'
:
'dbt'
,
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'dropFlag'
:
1
,
'event'
:
''
,
'event'
:
''
,
'vgroups'
:
3
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'tagPrefix'
:
't'
,
...
@@ -37,14 +37,14 @@ class TDTestCase:
...
@@ -37,14 +37,14 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
5
00
,
'ctbNum'
:
10
00
,
'rowsPerTbl'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
5
00
,
'batchNum'
:
4
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showMsg'
:
1
,
'showRow'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
...
@@ -54,20 +54,21 @@ class TDTestCase:
...
@@ -54,20 +54,21 @@ class TDTestCase:
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
#
tdLog.info("create ctb")
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
#
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
#
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog
.
info
(
"insert data"
)
#
tdLog.info("insert data")
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
#
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
#
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
#
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
t
dLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
t
mqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctbx"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
]
)
# tdDnodes.stop(1)
# td
Dnodes.start(1)
# td
Log.info("restart taosd to ensure that the data falls into the disk")
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
#
tdSql.query("flush database %s"%(paraDict['dbName']))
return
return
# 自动建表完成数据插入,启动消费
def
tmqCase1
(
self
):
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
paraDict
=
{
'dbName'
:
'dbt'
,
...
@@ -90,21 +91,16 @@ class TDTestCase:
...
@@ -90,21 +91,16 @@ class TDTestCase:
'showRow'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
'snapshot'
:
1
}
#
paraDict['vgroups'] = self.vgroups
paraDict
[
'vgroups'
]
=
self
.
vgroups
#
paraDict['ctbNum'] = self.ctbNum
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
#
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
# tmqCom.initConsumerTable()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog
.
info
(
"create stb"
)
# tdLog.info("create stb")
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
# tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog
.
info
(
"create ctb"
)
# tdLog.info("insert data by auto create ctb")
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
topicFromStb1
=
'topic_stb1'
...
@@ -120,20 +116,13 @@ class TDTestCase:
...
@@ -120,20 +116,13 @@ class TDTestCase:
ifManualCommit
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
enable.auto.commit:true,\
auto.commit.interval.ms:
5
00,\
auto.commit.interval.ms:
10
00,\
auto.offset.reset:earliest'
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# time.sleep(3)
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
...
@@ -172,105 +161,23 @@ class TDTestCase:
...
@@ -172,105 +161,23 @@ class TDTestCase:
'pollDelay'
:
5
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showMsg'
:
1
,
'showRow'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
'snapshot'
:
0
}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"create some new child table and insert data "
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
# 自动建表完成数据插入,启动消费
def
tmqCase3
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 3: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
400
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog
.
info
(
"create stb"
)
# tdLog.info("create stb")
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
# tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog
.
info
(
"insert data by auto create ctb"
)
# tdLog.info("create ctb")
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
# tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
# ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("insert data")
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog
.
info
(
"create topics from stb1"
)
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
...
@@ -279,7 +186,7 @@ class TDTestCase:
...
@@ -279,7 +186,7 @@ class TDTestCase:
tdSql
.
execute
(
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicFromStb1
topicList
=
topicFromStb1
ifcheckdata
=
0
ifcheckdata
=
0
ifManualCommit
=
0
ifManualCommit
=
0
...
@@ -292,10 +199,8 @@ class TDTestCase:
...
@@ -292,10 +199,8 @@ class TDTestCase:
tdLog
.
info
(
"start consume processor"
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# tdLog.info("================= restart dnode ===========================")
tdLog
.
info
(
"create some new child table and insert data "
)
# tdDnodes.stop(1)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctby"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
# tdDnodes.start(1)
# time.sleep(2)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
expectRows
=
1
...
@@ -313,15 +218,15 @@ class TDTestCase:
...
@@ -313,15 +218,15 @@ class TDTestCase:
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case
3
end ...... "
)
tdLog
.
printNoPrefix
(
"======== test case
2
end ...... "
)
def
run
(
self
):
def
run
(
self
):
tdSql
.
prepare
()
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
# self.tmqCase2() TD-17267
# self.tmqCase1()
# self.tmqCase2()
self
.
tmqCase3
()
def
stop
(
self
):
def
stop
(
self
):
tdSql
.
close
()
tdSql
.
close
()
...
...
tests/system-test/7-tmq/tmqDnodeRestart.py
0 → 100644
浏览文件 @
721a7e64
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
10000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
3
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
500
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
400
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:500,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# time.sleep(3)
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"create some new child table and insert data "
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
721a7e64
...
@@ -176,8 +176,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
...
@@ -176,8 +176,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
#------------querPolicy 2-----------
#------------querPolicy 2-----------
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录