diff --git a/tests/system-test/0-others/user_control.py b/tests/system-test/0-others/user_control.py index 87f689b3a364349fad98b5ae76d96821b2a6610a..609863e1138ca0c2fc12e5b06dc7c2b7f4523818 100644 --- a/tests/system-test/0-others/user_control.py +++ b/tests/system-test/0-others/user_control.py @@ -205,53 +205,37 @@ class TDTestCase: def __grant_user_privileges(self, privilege, dbname=None, user_name="root"): return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} " - def grant_check_read(self, user="root", passwd="taosdata"): - # sourcery skip: class-extract-method + def grant_check(self, user="root", passwd="taosdata", priv=PRIVILEGES_ALL): with taos_connect(user=user, passwd=passwd) as user: user.query("use db") user.query("show tables") - user.query("select * from ct1") - user.error("insert into t1 (ts) values (now())") - - def grant_check_write(self, user="root", passwd="taosdata"): - with taos_connect(user=user, passwd=passwd) as user: - user.query("use db") - user.query("show tables") - user.error("select * from ct1") - user.query("insert into t1 (ts) values (now())") - - def grant_check_all(self, user="root", passwd="taosdata"): - with taos_connect(user=user, passwd=passwd) as user: - user.query("use db") - user.query("show tables") - user.query("select * from ct1") - user.query("insert into t1 (ts) values (now())") - - def grant_check_none(self, user="root", passwd="taosdata"): - with taos_connect(user=user, passwd=passwd) as user: - user.query("use db") - user.query("show tables") - user.error("select * from ct1") - user.error("insert into t1 (ts) values (now())") + if priv in [PRIVILEGES_ALL, PRIVILEGES_READ]: + user.query("select * from ct1") + else: + user.error("select * from ct1") + if priv in [PRIVILEGES_ALL, PRIVILEGES_WRITE]: + user.query("insert into t1 (ts) values (now())") + else: + user.error("insert into t1 (ts) values (now())") def test_grant_current(self): tdLog.printNoPrefix("==========step 1.0: if do not grant, can not read/write") - self.grant_check_none(user=self.__user_list[0], passwd=self.__passwd_list[0]) + self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=None) tdLog.printNoPrefix("==========step 1.1: grant read, can read, can not write") - sql = self.__grant_user_privileges(privilege=self.__privilege[1], user_name=self.__user_list[0]) + sql = self.__grant_user_privileges(privilege=PRIVILEGES_READ, user_name=self.__user_list[0]) tdLog.info(sql) tdSql.query(sql) - self.grant_check_read(user=self.__user_list[0], passwd=self.__passwd_list[0]) + self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_READ) tdLog.printNoPrefix("==========step 1.2: grant write, can write, can not read") - sql = self.__grant_user_privileges(privilege=self.__privilege[2], user_name=self.__user_list[1]) + sql = self.__grant_user_privileges(privilege=PRIVILEGES_WRITE, user_name=self.__user_list[1]) tdLog.info(sql) tdSql.query(sql) - self.grant_check_write(user=self.__user_list[1], passwd=self.__passwd_list[1]) + self.grant_check(user=self.__user_list[1], passwd=self.__passwd_list[1], priv=PRIVILEGES_WRITE) tdLog.printNoPrefix("==========step 1.3: grant all, can write and read") - sql = self.__grant_user_privileges(privilege=self.__privilege[0], user_name=self.__user_list[2]) + sql = self.__grant_user_privileges(privilege=PRIVILEGES_ALL, user_name=self.__user_list[2]) tdLog.info(sql) tdSql.query(sql) self.grant_check_all(user=self.__user_list[2], passwd=self.__passwd_list[2])