Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3a7c19a4
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3a7c19a4
编写于
5月 19, 2022
作者:
C
cpwu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix case
上级
90269d17
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
86 addition
and
3 deletion
+86
-3
tests/system-test/0-others/user_control.py
tests/system-test/0-others/user_control.py
+86
-3
未找到文件。
tests/system-test/0-others/user_control.py
浏览文件 @
3a7c19a4
...
...
@@ -189,10 +189,94 @@ class TDTestCase:
for
sql
in
sqls
:
tdSql
.
error
(
sql
)
def
grant_user_privileges
(
self
,
privilege
,
dbname
=
None
,
user_name
=
"root"
):
def
__grant_user_privileges
(
self
,
privilege
,
dbname
=
None
,
user_name
=
"root"
):
return
f
"GRANT
{
privilege
}
ON
{
self
.
__priv_level
(
dbname
)
}
TO
{
user_name
}
"
def
grant_check_read
(
self
,
user
=
"root"
,
passwd
=
"taosdata"
):
with
taos_connect
(
user
=
user
,
passwd
=
passwd
)
as
user
:
user
.
query
(
"use db"
)
user
.
query
(
"show tables"
)
user
.
query
(
"select * from ct1"
)
user
.
error
(
"insert into t1 (ts) values (now())"
)
def
grant_check_write
(
self
,
user
=
"root"
,
passwd
=
"taosdata"
):
with
taos_connect
(
user
=
user
,
passwd
=
passwd
)
as
user
:
user
.
query
(
"use db"
)
user
.
query
(
"show tables"
)
user
.
error
(
"select * from ct1"
)
user
.
query
(
"insert into t1 (ts) values (now())"
)
def
grant_check_all
(
self
,
user
=
"root"
,
passwd
=
"taosdata"
):
with
taos_connect
(
user
=
user
,
passwd
=
passwd
)
as
user
:
user
.
query
(
"use db"
)
user
.
query
(
"show tables"
)
user
.
query
(
"select * from ct1"
)
user
.
query
(
"insert into t1 (ts) values (now())"
)
def
grant_check_none
(
self
,
user
=
"root"
,
passwd
=
"taosdata"
):
with
taos_connect
(
user
=
user
,
passwd
=
passwd
)
as
user
:
user
.
query
(
"use db"
)
user
.
query
(
"show tables"
)
user
.
error
(
"select * from ct1"
)
user
.
error
(
"insert into t1 (ts) values (now())"
)
def
grant_current
(
self
):
tdLog
.
printNoPrefix
(
"==========step 1.0: if do not grant, can not read/write"
)
self
.
grant_check_none
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.1: grant read, can read, can not write"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
1
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_read
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.2: grant write, can write, can not read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
2
],
user_name
=
self
.
__user_list
[
1
])
self
.
grant_check_write
(
user
=
self
.
__user_list
[
1
],
passwd
=
self
.
__passwd_list
[
1
])
tdLog
.
printNoPrefix
(
"==========step 1.3: grant all, can write and read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
0
],
user_name
=
self
.
__user_list
[
2
])
self
.
grant_check_all
(
user
=
self
.
__user_list
[
2
],
passwd
=
self
.
__passwd_list
[
2
])
tdLog
.
printNoPrefix
(
"==========step 1.4: change grant read to write, can write , can not read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
2
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_write
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.5: change grant write to read, can not write , can read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
1
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_read
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.6: change grant read to all, can write , can read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
0
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_all
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.7: change grant all to write, can write , can not read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
2
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_write
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.8: change grant write to all, can write , can read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
0
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_all
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
tdLog
.
printNoPrefix
(
"==========step 1.9: change grant all to read, can not write , can read"
)
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
1
],
user_name
=
self
.
__user_list
[
0
])
self
.
grant_check_read
(
user
=
self
.
__user_list
[
0
],
passwd
=
self
.
__passwd_list
[
0
])
def
__grant_err
(
self
):
return
[
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
0
],
user_name
=
""
)
,
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
0
],
user_name
=
"*"
)
,
self
.
__grant_user_privileges
(
privilege
=
self
.
__privilege
[
1
],
dbname
=
"not_exist_db"
,
user_name
=
self
.
__user_list
[
0
]),
self
.
__grant_user_privileges
(
privilege
=
"any_priv"
,
user_name
=
self
.
__user_list
[
0
]),
self
.
__grant_user_privileges
(
privilege
=
""
,
dbname
=
"db"
,
user_name
=
self
.
__user_list
[
0
])
,
self
.
__grant_user_privileges
(
privilege
=
" "
.
join
(
self
.
__privilege
),
user_name
=
self
.
__user_list
[
0
])
,
f
"GRANT
{
self
.
__privilege
[
0
]
}
ON * TO
{
self
.
__user_list
[
0
]
}
"
,
f
"GRANT
{
self
.
__privilege
[
0
]
}
ON db.t1 TO
{
self
.
__user_list
[
0
]
}
"
,
]
def
test_grant_err
(
self
):
for
sql
in
self
.
__grant_err
():
tdSql
.
error
(
sql
)
def
test_user_create
(
self
):
self
.
create_user_current
()
self
.
create_user_err
()
...
...
@@ -218,7 +302,6 @@ class TDTestCase:
else
:
tdLog
.
info
(
"connect successfully, user and pass matched!"
)
def
login_err
(
self
,
user
,
passwd
):
login_except
,
_
=
self
.
user_login
(
user
,
passwd
)
if
login_except
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录