Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
091f4c73
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
091f4c73
编写于
6月 30, 2022
作者:
H
Hui Li
提交者:
GitHub
6月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14377 from taosdata/test3.0/lihui
test: add test case
上级
45e9b239
859d6040
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
73 addition
and
25 deletion
+73
-25
tests/system-test/7-tmq/subscribeDb4.py
tests/system-test/7-tmq/subscribeDb4.py
+27
-20
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+16
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+29
-5
未找到文件。
tests/system-test/7-tmq/subscribeDb4.py
浏览文件 @
091f4c73
...
...
@@ -15,18 +15,23 @@ from tmqCommon import *
class
TDTestCase
:
paraDict
=
{
'dbName'
:
'db12'
,
'dropFlag'
:
1
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'precision'
:
'ms'
,
'stbName'
:
'stb0'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
16
,
'count'
:
1
},
{
'type'
:
'timestamp'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
0
,
# 1640966400000 ----> 2022-01-01 00:00:00.000
'event'
:
''
,
'columnDict'
:
{
'int'
:
2
},
'tagDict'
:
{
'int'
:
1
}
}
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
20
,
'showMsg'
:
1
,
'showRow'
:
1
}
cdbName
=
'cdb'
# some parameter to consumer processor
...
...
@@ -57,17 +62,19 @@ class TDTestCase:
tmqCom
.
initConsumerTable
(
self
.
cdbName
)
tdCom
.
create_database
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"dropFlag"
]
,
self
.
paraDict
[
'precision'
]
)
tdCom
.
create_database
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"dropFlag"
])
self
.
paraDict
[
"stbName"
]
=
'stb1'
tdCom
.
create_stable
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"columnDict"
],
self
.
paraDict
[
"tagDict
"
])
tdCom
.
create_ctable
s
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"tagDict
"
])
t
dCom
.
insert_data
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum
"
])
tdCom
.
create_stable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
column_elm_list
=
self
.
paraDict
[
"colSchema"
],
tag_elm_list
=
self
.
paraDict
[
"tagSchema"
],
count
=
1
,
default_stbname_prefix
=
self
.
paraDict
[
"stbName
"
])
tdCom
.
create_ctable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
tag_elm_list
=
self
.
paraDict
[
'tagSchema'
],
count
=
self
.
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
self
.
paraDict
[
"ctbPrefix
"
])
t
mqCom
.
insert_data_2
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"ctbPrefix"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
],
self
.
paraDict
[
"startTs"
],
self
.
paraDict
[
"ctbStartIdx
"
])
tdLog
.
info
(
"22222222222222222"
)
self
.
paraDict
[
"stbName"
]
=
'stb2'
tdCom
.
create_stable
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"columnDict"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
create_ctables
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
insert_data
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
])
self
.
paraDict
[
"ctbPrefix"
]
=
'newctb'
tdCom
.
create_stable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
column_elm_list
=
self
.
paraDict
[
"colSchema"
],
tag_elm_list
=
self
.
paraDict
[
"tagSchema"
],
count
=
1
,
default_stbname_prefix
=
self
.
paraDict
[
"stbName"
])
tdCom
.
create_ctable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
tag_elm_list
=
self
.
paraDict
[
'tagSchema'
],
count
=
self
.
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
self
.
paraDict
[
"ctbPrefix"
])
tmqCom
.
insert_data_2
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"ctbPrefix"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
],
self
.
paraDict
[
"startTs"
],
self
.
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"create topics from db"
)
topicName1
=
'topic_%s'
%
(
self
.
paraDict
[
'dbName'
])
...
...
@@ -97,7 +104,7 @@ class TDTestCase:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: between %d and %d"
%
(
totalConsumeRows
,
self
.
expectrowcnt
/
2
,
self
.
expectrowcnt
))
tdLog
.
exit
(
"tmq consume rows error!"
)
time
.
sleep
(
1
5
)
time
.
sleep
(
1
0
)
tdSql
.
query
(
"drop topic %s"
%
topicName1
)
tdLog
.
printNoPrefix
(
"======== test case 12 end ...... "
)
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
091f4c73
...
...
@@ -77,6 +77,22 @@ class TMQCom:
return
resultList
def
selectConsumeMsgResult
(
self
,
expectRows
,
cdbName
=
'cdb'
):
resultList
=
[]
while
1
:
tdSql
.
query
(
"select * from %s.consumeresult"
%
cdbName
)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if
tdSql
.
getRows
()
==
expectRows
:
break
else
:
time
.
sleep
(
5
)
for
i
in
range
(
expectRows
):
tdLog
.
info
(
"consume id: %d, consume msgs: %d, consume rows: %d"
%
(
tdSql
.
getData
(
i
,
1
),
tdSql
.
getData
(
i
,
2
),
tdSql
.
getData
(
i
,
3
)))
resultList
.
append
(
tdSql
.
getData
(
i
,
2
))
return
resultList
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
,
alias
=
0
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
...
...
tests/system-test/fulltest.sh
浏览文件 @
091f4c73
...
...
@@ -136,6 +136,7 @@ python3 ./test.py -f 7-tmq/subscribeDb0.py
python3 ./test.py
-f
7-tmq/subscribeDb1.py
python3 ./test.py
-f
7-tmq/subscribeDb2.py
python3 ./test.py
-f
7-tmq/subscribeDb3.py
#python3 ./test.py -f 7-tmq/subscribeDb4.py
python3 ./test.py
-f
7-tmq/subscribeStb.py
python3 ./test.py
-f
7-tmq/subscribeStb0.py
python3 ./test.py
-f
7-tmq/subscribeStb1.py
...
...
tests/test/c/tmqSim.c
浏览文件 @
091f4c73
...
...
@@ -635,6 +635,9 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int32_t
consumeDelay
=
g_stConfInfo
.
consumeDelay
==
-
1
?
-
1
:
(
g_stConfInfo
.
consumeDelay
*
1000
);
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
consumeDelay
);
...
...
@@ -647,6 +650,14 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs
++
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
totalMsgs
);
lastPrintTime
=
currentPrintTime
;
}
if
(
0
==
once_flag
)
{
once_flag
=
1
;
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
...
...
@@ -676,8 +687,6 @@ void loop_consume(SThreadInfo* pInfo) {
}
void
*
consumeThreadFunc
(
void
*
param
)
{
int32_t
totalMsgs
=
0
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
pInfo
->
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -859,12 +868,27 @@ int main(int32_t argc, char* argv[]) {
(
void
*
)(
&
(
g_stConfInfo
.
stThreads
[
i
])));
}
int64_t
start
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosThreadJoin
(
g_stConfInfo
.
stThreads
[
i
].
thread
,
NULL
);
taosThreadClear
(
&
g_stConfInfo
.
stThreads
[
i
].
thread
);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
int64_t
end
=
taosGetTimestampUs
();
int64_t
totalMsgs
=
0
;
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
totalMsgs
+=
g_stConfInfo
.
stThreads
[
i
].
consumeMsgCnt
;
}
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
taosFprintfFile
(
g_fp
,
"Spent %.4f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.2f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录