Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7797027d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7797027d
编写于
10月 27, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test:modify test case
上级
7af4b2c5
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
50 addition
and
32 deletion
+50
-32
tests/system-test/7-tmq/tmqError.py
tests/system-test/7-tmq/tmqError.py
+50
-32
未找到文件。
tests/system-test/7-tmq/tmqError.py
浏览文件 @
7797027d
...
...
@@ -11,6 +11,8 @@ from util.log import *
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
actionType
(
Enum
):
CREATE_DATABASE
=
0
...
...
@@ -193,32 +195,41 @@ class TDTestCase:
and restart a consumption process to complete a consumption
'''
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
self
.
initConsumerTable
()
# create and start thread
parameterDict
=
{
'cfg'
:
''
,
\
'actionType'
:
0
,
\
'dbName'
:
'db3'
,
\
'dropFlag'
:
1
,
\
'vgroups'
:
4
,
\
'replica'
:
1
,
\
'stbName'
:
'stb1'
,
\
'ctbNum'
:
10
,
\
'rowsPerTbl'
:
20000
,
\
'batchNum'
:
100
,
\
'startTs'
:
1640966400000
}
# 2022-01-01 00:00:00.000
parameterDict
[
'cfg'
]
=
cfgPath
self
.
create_database
(
tdSql
,
parameterDict
[
"dbName"
])
self
.
create_stable
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
])
self
.
create_ctables
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
],
parameterDict
[
"rowsPerTbl"
],
parameterDict
[
"batchNum"
])
tmqCom
.
initConsumerTable
()
#self.initConsumerTable()
# create and start thread
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
20000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
30
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'cfg'
]
=
cfgPath
self
.
create_database
(
tdSql
,
paraDict
[
"dbName"
])
self
.
create_stable
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
])
self
.
create_ctables
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
tdSql
.
execute
(
"create topic %s as select ts, c1, c2 from %s.%s"
%
(
topicFromStb1
,
para
meterDict
[
'dbName'
],
parameter
Dict
[
'stbName'
]))
tdSql
.
execute
(
"create topic %s as select ts, c1, c2 from %s.%s"
%
(
topicFromStb1
,
para
Dict
[
'dbName'
],
para
Dict
[
'stbName'
]))
consumerId
=
0
# expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
expectrowcnt
=
90000000000
...
...
@@ -229,13 +240,16 @@ class TDTestCase:
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
pollDelay
=
9000000
# Forever loop
showMsg
=
1
showRow
=
1
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
#self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
#time.sleep(3)
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
...
...
@@ -254,17 +268,17 @@ class TDTestCase:
######### redo to consume
self
.
initConsumerTable
()
self
.
create_database
(
tdSql
,
para
meter
Dict
[
"dbName"
])
self
.
create_stable
(
tdSql
,
para
meterDict
[
"dbName"
],
parameter
Dict
[
"stbName"
])
self
.
create_ctables
(
tdSql
,
para
meterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameter
Dict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
para
meterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
],
parameterDict
[
"rowsPerTbl"
],
parameter
Dict
[
"batchNum"
])
self
.
create_database
(
tdSql
,
paraDict
[
"dbName"
])
self
.
create_stable
(
tdSql
,
para
Dict
[
"dbName"
],
para
Dict
[
"stbName"
])
self
.
create_ctables
(
tdSql
,
para
Dict
[
"dbName"
],
paraDict
[
"stbName"
],
para
Dict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
para
Dict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
para
Dict
[
"batchNum"
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
tdSql
.
execute
(
"create topic %s as select ts, c1, c2 from %s.%s"
%
(
topicFromStb1
,
para
meterDict
[
'dbName'
],
parameter
Dict
[
'stbName'
]))
tdSql
.
execute
(
"create topic %s as select ts, c1, c2 from %s.%s"
%
(
topicFromStb1
,
para
Dict
[
'dbName'
],
para
Dict
[
'stbName'
]))
consumerId
=
0
expectrowcnt
=
para
meterDict
[
"rowsPerTbl"
]
*
parameter
Dict
[
"ctbNum"
]
expectrowcnt
=
para
Dict
[
"rowsPerTbl"
]
*
para
Dict
[
"ctbNum"
]
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
...
...
@@ -272,13 +286,17 @@ class TDTestCase:
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
pollDelay
=
20
showMsg
=
1
showRow
=
1
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
paraDict
[
'pollDelay'
]
=
20
#self.startTmqSimProcess(buildPath,cfgPath,pollDelay,paraDict["dbName"],showMsg, showRow)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
expectRows
=
1
resultList
=
self
.
selectConsumeResult
(
expectRows
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录