Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2573dcc0
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2573dcc0
编写于
8月 09, 2023
作者:
C
chao.feng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'td_25179_update' of
https://github.com/taosdata/TDengine
into td_25179_update
上级
c5aae97d
37c39a1e
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
253 addition
and
0 deletion
+253
-0
tests/system-test/0-others/user_privilege_multi_users.py
tests/system-test/0-others/user_privilege_multi_users.py
+126
-0
tests/system-test/7-tmq/tmqSeekAndCommit.py
tests/system-test/7-tmq/tmqSeekAndCommit.py
+127
-0
未找到文件。
tests/system-test/0-others/user_privilege_multi_users.py
0 → 100644
浏览文件 @
2573dcc0
from
itertools
import
product
import
taos
import
random
from
taos.tmq
import
*
from
util.cases
import
*
from
util.common
import
*
from
util.log
import
*
from
util.sql
import
*
from
util.sqlset
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
# init the tdsql
tdSql
.
init
(
conn
.
cursor
())
self
.
setsql
=
TDSetSql
()
# user info
self
.
userNum
=
100
self
.
basic_username
=
"user"
self
.
password
=
"pwd"
# db info
self
.
dbname
=
"user_privilege_multi_users"
self
.
stbname
=
'stb'
self
.
ctbname_num
=
100
self
.
column_dict
=
{
'ts'
:
'timestamp'
,
'col1'
:
'float'
,
'col2'
:
'int'
,
}
self
.
tag_dict
=
{
'ctbname'
:
'binary(10)'
}
self
.
privilege_list
=
[]
def
prepare_data
(
self
):
"""Create the db and data for test
"""
# create datebase
tdSql
.
execute
(
f
"create database
{
self
.
dbname
}
"
)
tdLog
.
debug
(
"sql:"
+
f
"create database
{
self
.
dbname
}
"
)
tdSql
.
execute
(
f
"use
{
self
.
dbname
}
"
)
tdLog
.
debug
(
"sql:"
+
f
"use
{
self
.
dbname
}
"
)
# create super table
tdSql
.
execute
(
self
.
setsql
.
set_create_stable_sql
(
self
.
stbname
,
self
.
column_dict
,
self
.
tag_dict
))
tdLog
.
debug
(
"Create stable {} successfully"
.
format
(
self
.
stbname
))
for
ctbIndex
in
range
(
self
.
ctbname_num
):
ctname
=
f
"ctb
{
ctbIndex
}
"
tdSql
.
execute
(
f
"create table
{
ctname
}
using
{
self
.
stbname
}
tags('
{
ctname
}
')"
)
tdLog
.
debug
(
"sql:"
+
f
"create table
{
ctname
}
using
{
self
.
stbname
}
tags('
{
ctname
}
')"
)
def
create_multiusers
(
self
):
"""Create the user for test
"""
for
userIndex
in
range
(
self
.
userNum
):
username
=
f
"
{
self
.
basic_username
}{
userIndex
}
"
tdSql
.
execute
(
f
'create user
{
username
}
pass "
{
self
.
password
}
"'
)
tdLog
.
debug
(
"sql:"
+
f
'create user
{
username
}
pass "
{
self
.
password
}
"'
)
def
grant_privilege
(
self
):
"""Add the privilege for the users
"""
try
:
for
userIndex
in
range
(
self
.
userNum
):
username
=
f
"
{
self
.
basic_username
}{
userIndex
}
"
privilege
=
random
.
choice
([
"read"
,
"write"
,
"all"
])
condition
=
f
"ctbname='ctb
{
userIndex
}
'"
self
.
privilege_list
.
append
({
"username"
:
username
,
"privilege"
:
privilege
,
"condition"
:
condition
})
tdSql
.
execute
(
f
'grant
{
privilege
}
on
{
self
.
dbname
}
.
{
self
.
stbname
}
with
{
condition
}
to
{
username
}
'
)
tdLog
.
debug
(
"sql:"
+
f
'grant
{
privilege
}
on
{
self
.
dbname
}
.
{
self
.
stbname
}
with
{
condition
}
to
{
username
}
'
)
except
Exception
as
ex
:
tdLog
.
exit
(
ex
)
def
remove_privilege
(
self
):
"""Remove the privilege for the users
"""
try
:
for
item
in
self
.
privilege_list
:
username
=
item
[
"username"
]
privilege
=
item
[
"privilege"
]
condition
=
item
[
"condition"
]
tdSql
.
execute
(
f
'revoke
{
privilege
}
on
{
self
.
dbname
}
.
{
self
.
stbname
}
with
{
condition
}
from
{
username
}
'
)
tdLog
.
debug
(
"sql:"
+
f
'revoke
{
privilege
}
on
{
self
.
dbname
}
.
{
self
.
stbname
}
with
{
condition
}
from
{
username
}
'
)
except
Exception
as
ex
:
tdLog
.
exit
(
ex
)
def
run
(
self
):
"""
Check the information from information_schema.ins_user_privileges
"""
self
.
create_multiusers
()
self
.
prepare_data
()
# grant privilege to users
self
.
grant_privilege
()
# check information_schema.ins_user_privileges
tdSql
.
query
(
"select * from information_schema.ins_user_privileges;"
)
tdLog
.
debug
(
"Current information_schema.ins_user_privileges values: {}"
.
format
(
tdSql
.
queryResult
))
if
len
(
tdSql
.
queryResult
)
>=
self
.
userNum
:
tdLog
.
debug
(
"case passed"
)
else
:
tdLog
.
exit
(
"The privilege number in information_schema.ins_user_privileges is incorrect"
)
def
stop
(
self
):
# remove the privilege
self
.
remove_privilege
()
# clear env
tdSql
.
execute
(
f
"drop database
{
self
.
dbname
}
"
)
# remove the users
for
userIndex
in
range
(
self
.
userNum
):
username
=
f
"
{
self
.
basic_username
}{
userIndex
}
"
tdSql
.
execute
(
f
'drop user
{
username
}
'
)
# close the connection
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqSeekAndCommit.py
0 → 100644
浏览文件 @
2573dcc0
import
sys
import
re
import
time
import
threading
from
taos.tmq
import
*
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
self
.
db_name
=
"tmq_db"
self
.
topic_name
=
"tmq_topic"
self
.
stable_name
=
"tmqst"
def
prepareData
(
self
):
# create database
tdSql
.
execute
(
"create database if not exists %s;"
%
(
self
.
db_name
))
tdSql
.
execute
(
"use %s;"
%
(
self
.
db_name
))
# create stable
tdSql
.
execute
(
"create table %s.tmqst (ts timestamp, col0 int) tags(groupid int);"
%
(
self
.
db_name
))
# create child tables
tdSql
.
execute
(
"create table tmqct_1 using %s.%s tags(1);"
%
(
self
.
db_name
,
self
.
stable_name
))
tdSql
.
execute
(
"create table tmqct_2 using %s.%s tags(2);"
%
(
self
.
db_name
,
self
.
stable_name
))
tdSql
.
execute
(
"create table tmqct_3 using %s.%s tags(3);"
%
(
self
.
db_name
,
self
.
stable_name
))
tdSql
.
execute
(
"create table tmqct_4 using %s.%s tags(4);"
%
(
self
.
db_name
,
self
.
stable_name
))
tdSql
.
execute
(
"create table tmqct_5 using %s.%s tags(5);"
%
(
self
.
db_name
,
self
.
stable_name
))
# insert into data
ctb_list
=
[
"tmqct_1"
,
"tmqct_2"
,
"tmqct_3"
,
"tmqct_4"
,
"tmqct_5"
]
for
i
in
range
(
5
):
sql
=
"insert into %s "
%
(
ctb_list
[
i
])
sql_values
=
"values"
for
j
in
range
(
1000
*
i
,
1000
*
(
i
+
1
)):
sql_values
+=
"(%s, %s)"
%
(
"now"
if
j
==
0
else
"now+%s"
%
(
str
(
j
)
+
"s"
),
str
(
j
))
sql
+=
sql_values
+
";"
tdLog
.
info
(
sql
)
tdSql
.
execute
(
sql
)
tdLog
.
info
(
"Insert data into child tables successfully"
)
# create topic
tdSql
.
execute
(
"create topic %s as select * from %s;"
%
(
self
.
topic_name
,
self
.
stable_name
))
def
tmqSubscribe
(
self
,
inputDict
):
consumer_dict
=
{
"group.id"
:
inputDict
[
'group_id'
],
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
inputDict
[
'auto_commit'
],
"auto.offset.reset"
:
inputDict
[
'offset_reset'
],
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
consumer
=
Consumer
(
consumer_dict
)
try
:
consumer
.
subscribe
([
inputDict
[
'topic_name'
]])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
tdLog
.
info
(
"create consumer success!"
)
return
consumer
def
test_seek_and_committed_position_with_autocommit
(
self
):
try
:
self
.
prepareData
()
inputDict
=
{
"topic_name"
:
self
.
topic_name
,
"group_id"
:
"1"
,
"auto_commit"
:
"true"
,
"offset_reset"
:
"earliest"
}
consumer
=
self
.
tmqSubscribe
(
inputDict
)
while
(
True
):
res
=
consumer
.
poll
(
1
)
if
not
res
:
break
err
=
res
.
error
()
if
err
is
not
None
:
raise
err
val
=
res
.
value
()
for
block
in
val
:
tdLog
.
info
(
"block.fetchall() number: %s"
%
(
len
(
block
.
fetchall
())))
partitions
=
consumer
.
assignment
()
position_partitions
=
consumer
.
position
(
partitions
)
tdLog
.
info
(
"position_partitions: %s"
%
(
position_partitions
))
for
i
in
range
(
len
(
position_partitions
)):
tdLog
.
info
(
"position_partitions[%s].offset: %s"
%
(
i
,
position_partitions
[
i
].
offset
))
committed_partitions
=
consumer
.
committed
(
partitions
)
tdLog
.
info
(
"committed_partitions: %s"
%
(
committed_partitions
))
for
i
in
range
(
len
(
committed_partitions
)):
tdLog
.
info
(
"committed_partitions[%s].offset: %s"
%
(
i
,
committed_partitions
[
i
].
offset
))
assert
(
len
(
position_partitions
)
==
len
(
committed_partitions
))
for
i
in
range
(
len
(
position_partitions
)):
assert
(
position_partitions
[
i
].
offset
==
committed_partitions
[
i
].
offset
)
# seek to the beginning of the topic
except
Exception
as
ex
:
raise
Exception
(
"Failed to test seek and committed position with autocommit with error: {}"
.
format
(
str
(
ex
)))
finally
:
consumer
.
unsubscribe
()
consumer
.
close
()
def
test_commit_by_offset
(
self
):
pass
def
run
(
self
):
self
.
test_seek_and_committed_position_with_autocommit
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录