提交 011169b1 编写于 作者: C cpwu

fix case

上级 8ad2efee
from tabnanny import check
import taos
import sys
import time
import inspect
import traceback
from dataclasses import dataclass
from util.log import *
from util.sql import *
......@@ -12,6 +14,10 @@ PRIVILEGES_ALL = "ALL"
PRIVILEGES_READ = "READ"
PRIVILEGES_WRITE = "WRITE"
WEIGHT_ALL = 5
WEIGHT_READ = 2
WEIGHT_WRITE = 3
PRIMARY_COL = "ts"
INT_COL = "c1"
......@@ -94,6 +100,7 @@ class TDconnect:
self.cursor.close()
self._conn.close()
def taos_connect(
host = "127.0.0.1",
port = 6030,
......@@ -111,6 +118,15 @@ def taos_connect(
config=config
)
@dataclass
class User:
name : str = None
passwd : str = None
db_set : set = set()
priv : str = None
priv_weight : int = 0
class TDTestCase:
def init(self, conn, logSql):
......@@ -121,6 +137,21 @@ class TDTestCase:
def __user_list(self):
return [f"user_test{i}" for i in range(self.users_count) ]
def __users(self):
self.users = []
self.root_user = User()
self.root_user.name = "root"
self.root_user.passwd = "passwd"
self.root_user.db_set = set("*")
self.root_user.priv = PRIVILEGES_ALL
self.root_user.priv_weight = WEIGHT_ALL
for i in range(self.users_count):
user = User()
user.name = f"user_test{i}"
user.passwd = f"taosdata{i}"
self.users.append(user)
return self.users
@property
def __passwd_list(self):
return [f"taosdata{i}" for i in range(self.users_count) ]
......@@ -205,76 +236,166 @@ class TDTestCase:
def __grant_user_privileges(self, privilege, dbname=None, user_name="root"):
return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} "
def grant_check(self, user="root", passwd="taosdata", priv=PRIVILEGES_ALL):
with taos_connect(user=user, passwd=passwd) as user:
user.query("use db")
user.query("show tables")
if priv in [PRIVILEGES_ALL, PRIVILEGES_READ]:
user.query("select * from ct1")
else:
user.error("select * from ct1")
if priv in [PRIVILEGES_ALL, PRIVILEGES_WRITE]:
user.query("insert into t1 (ts) values (now())")
else:
user.error("insert into t1 (ts) values (now())")
def test_grant_current(self):
tdLog.printNoPrefix("==========step 1.0: if do not grant, can not read/write")
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=None)
tdLog.printNoPrefix("==========step 1.1: grant read, can read, can not write")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_READ, user_name=self.__user_list[0])
def __revoke_user_privileges(self, privilege, dbname=None, user_name="root"):
return f"REVOKE {privilege} ON {self.__priv_level(dbname)} FROM {user_name} "
def __user_check(self, user:User=None, check_priv=PRIVILEGES_ALL):
if user is None:
user = self.root_user
with taos_connect(user=user.name, passwd=user.passwd) as use:
use.query("use db")
use.query("show tables")
if check_priv == PRIVILEGES_ALL:
use.query("select * from ct1")
use.query("insert into t1 (ts) values (now())")
elif check_priv == PRIVILEGES_READ:
use.query("select * from ct1")
use.error("insert into t1 (ts) values (now())")
elif check_priv == PRIVILEGES_WRITE:
use.error("select * from ct1")
use.query("insert into t1 (ts) values (now())")
elif check_priv is None:
use.error("select * from ct1")
use.error("insert into t1 (ts) values (now())")
def __change_user_priv(self, user: User, pre_priv, invoke=False):
if user.priv == pre_priv and invoke :
return
if pre_priv.upper() == PRIVILEGES_ALL:
pre_weight = -5 if invoke else 5
elif pre_priv.upper() == PRIVILEGES_READ:
pre_weight = -2 if invoke else 2
elif pre_priv.upper() == PRIVILEGES_WRITE:
pre_weight = -3 if invoke else 3
else:
return
pre_weight += user.priv_weight
if pre_weight >= 5:
user.priv = PRIVILEGES_ALL
user.priv_weight = 5
elif pre_weight == 3:
user.priv = PRIVILEGES_WRITE
user.priv_weight = pre_weight
elif pre_weight == 2:
user.priv_weight = pre_weight
user.priv = PRIVILEGES_READ
elif pre_weight in [1, -1]:
return
elif pre_weight <= 0:
user.priv_weight = 0
user.priv = ""
return user
def grant_user(self, user: User = None, priv=PRIVILEGES_ALL, dbname=None):
if not user:
user = self.root_user
sql = self.__grant_user_privileges(privilege=priv, dbname=dbname, user_name=user.name)
tdLog.info(sql)
if user not in self.users or user.name != "root" or priv not in (PRIVILEGES_ALL, PRIVILEGES_READ, PRIVILEGES_WRITE):
tdSql.error(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_READ)
self.__change_user_priv(user=user, pre_priv=priv)
user.db_set.add(dbname)
time.sleep(2)
tdLog.printNoPrefix("==========step 1.2: grant write, can write, can not read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_WRITE, user_name=self.__user_list[1])
def revoke_user(self, user: User = None, priv=PRIVILEGES_ALL, dbname=None):
sql = self.__revoke_user_privileges(privilege=priv, dbname=dbname, user_name=user.name)
tdLog.info(sql)
if not user or priv not in():
tdSql.error(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[1], passwd=self.__passwd_list[1], priv=PRIVILEGES_WRITE)
if user.name == "root":
return
self.__change_user_priv(user=user, pre_priv=priv, invoke=True)
user.db_set.remove(dbname)
time.sleep(2)
def test_priv_change_current(self):
tdLog.printNoPrefix("==========step 1.0: if do not grant, can not read/write")
self.__user_check()
self.__user_check(user=self.users[0], check_priv=None)
tdLog.printNoPrefix("==========step 1.1: grant read, can read, can not write")
self.grant_user(user=self.users[0], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.2: grant write, can write")
self.grant_user(user=self.users[1], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.3: grant all, can write and read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_ALL, user_name=self.__user_list[2])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[2], passwd=self.__passwd_list[2], priv=PRIVILEGES_ALL)
self.grant_user(user=self.users[2])
self.__user_check(user=self.users[2], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.4: change grant read to write, can write , can not read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_WRITE, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.4: grant read to write = all ")
self.grant_user(user=self.users[0], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.5: change grant write to read, can not write , can read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_READ, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.4: revoke write from all = read ")
self.revoke_user(user=self.users[0], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.6: change grant read to all, can write , can read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_ALL, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.5: grant write to read = all")
self.grant_user(user=self.users[1], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.7: change grant all to write, can write , can not read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_WRITE, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.4: revoke read from all = write ")
self.revoke_user(user=self.users[1], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.8: change grant write to all, can write , can read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_ALL, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.5: grant read to all = all")
self.grant_user(user=self.users[0], priv=PRIVILEGES_ALL)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.9: change grant all to read, can not write , can read")
sql = self.__grant_user_privileges(privilege=PRIVILEGES_READ, user_name=self.__user_list[0])
tdLog.info(sql)
tdSql.query(sql)
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.5: grant write to all = all")
self.grant_user(user=self.users[1], priv=PRIVILEGES_ALL)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.5: grant all to read = all")
self.grant_user(user=self.users[0], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_ALL)
tdLog.printNoPrefix("==========step 1.5: grant all to write = all")
self.grant_user(user=self.users[1], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_ALL)
### init user
self.revoke_user(user=self.users[0], priv=PRIVILEGES_WRITE)
self.revoke_user(user=self.users[1], priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.5: revoke read from write = no change")
self.revoke_user(user=self.users[1], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[1], check_priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.5: revoke write from read = no change")
self.revoke_user(user=self.users[0], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[0], check_priv=PRIVILEGES_READ)
tdLog.printNoPrefix("==========step 1.5: revoke read from read = nothing")
self.revoke_user(user=self.users[0], priv=PRIVILEGES_READ)
self.__user_check(user=self.users[0], check_priv=None)
tdLog.printNoPrefix("==========step 1.5: revoke write from write = nothing")
self.revoke_user(user=self.users[1], priv=PRIVILEGES_WRITE)
self.__user_check(user=self.users[1], check_priv=None)
### init user
self.grant_user(user=self.users[0], priv=PRIVILEGES_READ)
self.revoke_user(user=self.users[1], priv=PRIVILEGES_WRITE)
tdLog.printNoPrefix("==========step 1.5: revoke all from write = nothing")
self.revoke_user(user=self.users[1], priv=PRIVILEGES_ALL)
self.__user_check(user=self.users[1], check_priv=None)
tdLog.printNoPrefix("==========step 1.5: revoke all from read = nothing")
self.revoke_user(user=self.users[0], priv=PRIVILEGES_ALL)
self.__user_check(user=self.users[0], check_priv=None)
tdLog.printNoPrefix("==========step 1.5: revoke all from all = nothing")
self.revoke_user(user=self.users[2], priv=PRIVILEGES_ALL)
self.__user_check(user=self.users[2], check_priv=None)
def __grant_err(self):
return [
......@@ -288,13 +409,30 @@ class TDTestCase:
f"GRANT {self.__privilege[0]} ON db.t1 TO {self.__user_list[0]}" ,
]
def __revoke_err(self):
return [
self.__revoke_user_privileges(privilege=self.__privilege[0], user_name="") ,
self.__revoke_user_privileges(privilege=self.__privilege[0], user_name="*") ,
self.__revoke_user_privileges(privilege=self.__privilege[1], dbname="not_exist_db", user_name=self.__user_list[0]),
self.__revoke_user_privileges(privilege="any_priv", user_name=self.__user_list[0]),
self.__revoke_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) ,
self.__revoke_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) ,
f"REVOKE {self.__privilege[0]} ON * FROM {self.__user_list[0]}" ,
f"REVOKE {self.__privilege[0]} ON db.t1 FROM {self.__user_list[0]}" ,
]
def test_grant_err(self):
for sql in self.__grant_err():
tdSql.error(sql)
def test_grant(self):
def test_revoke_err(self):
for sql in self.__revoke_err():
tdSql.error(sql)
def test_change_priv(self):
self.test_grant_err()
self.test_grant_current()
self.test_revoke_err()
self.test_priv_change_current()
def test_user_create(self):
self.create_user_current()
......@@ -482,7 +620,7 @@ class TDTestCase:
self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}")
# 用户权限设置
self.test_grant()
self.test_change_priv()
# 修改密码
tdLog.printNoPrefix("==========step3: alter user pass test")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册