Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9ff243a8
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9ff243a8
编写于
8月 18, 2023
作者:
C
chao.feng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tmq test case to support data precision 'us' and 'ns' by charles
上级
a9d88b74
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
140 addition
and
0 deletion
+140
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
+139
-0
未找到文件。
tests/parallel_test/cases.task
浏览文件 @
9ff243a8
...
@@ -128,6 +128,7 @@
...
@@ -128,6 +128,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
...
...
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
0 → 100644
浏览文件 @
9ff243a8
import
sys
import
re
import
time
import
threading
from
taos.tmq
import
*
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
self
.
db_name
=
"tmq_db"
self
.
topic_name
=
"tmq_topic"
self
.
stable_name
=
"stb"
self
.
rows_per_table
=
1000
self
.
ctb_num
=
100
def
prepareData
(
self
,
precisionUnit
=
"ms"
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
startTS
=
1672502400000
if
precisionUnit
==
"us"
:
startTS
=
1672502400000000
elif
precisionUnit
==
"ns"
:
startTS
=
1672502400000000000
paraDict
=
{
'dbName'
:
self
.
db_name
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
self
.
stable_name
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
self
.
ctb_num
,
'rowsPerTbl'
:
self
.
rows_per_table
,
'batchNum'
:
100
,
'startTs'
:
startTS
,
# 2023-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
# init the consumer database
tmqCom
.
initConsumerTable
()
# create testing database、stable、ctables
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
self
.
replicaVar
,
precision
=
precisionUnit
)
tdLog
.
info
(
"create database %s successfully"
%
paraDict
[
"dbName"
])
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create stable %s successfully"
%
paraDict
[
"stbName"
])
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create child tables successfully"
)
# insert data into tables and wait the async thread exit
tdLog
.
info
(
"insert data into tables"
)
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
pThread
.
join
()
def
run
(
self
):
"""Check tmq feature for different data precision unit like "ms、us、ns"
"""
precision_unit
=
[
"ms"
,
"us"
,
"ns"
]
for
unit
in
precision_unit
:
tdLog
.
info
(
f
"start to test precision unit
{
unit
}
"
)
self
.
prepareData
(
precisionUnit
=
unit
)
# drop database if exists
tdSql
.
execute
(
f
"drop database if exists
{
self
.
db_name
}
"
)
self
.
prepareData
(
unit
)
# create topic
tdLog
.
info
(
"create topic from %s"
%
self
.
stable_name
)
queryString
=
"select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "
%
(
self
.
db_name
,
self
.
stable_name
)
sqlString
=
"create topic %s as %s"
%
(
self
.
topic_name
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# save consumer info
consumerId
=
0
expectrowcnt
=
self
.
rows_per_table
*
self
.
ctb_num
topicList
=
self
.
topic_name
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,
\
enable.auto.commit:false,
\
auto.commit.interval.ms:6000,
\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# start consume processor
paraDict
=
{
'pollDelay'
:
15
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
self
.
db_name
,
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsFromQuery
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, act query rows: %d "
%
(
totalConsumeRows
,
totalRowsFromQuery
))
if
totalConsumeRows
<
totalRowsFromQuery
:
tdLog
.
exit
(
"tmq consume rows error!"
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
self
.
topic_name
)
tdSql
.
query
(
"drop topic %s"
%
self
.
topic_name
)
tdSql
.
execute
(
"drop database %s"
%
self
.
db_name
)
def
stop
(
self
):
tdSql
.
execute
(
f
"drop database if exists
{
self
.
db_name
}
"
)
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录