diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index b600fa24d07d619d4afdb4a9b22ac843a9db8527..be431d124efb39c4ce4492f7d2a666d592bb67d5 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -247,6 +247,12 @@ class ApiUtilityWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ).run() return res, check_result + def update_password(self, user, old_password, new_password, check_task=None, check_items=None): + func_name = sys._getframe().f_code.co_name + res, is_succ = api_request([self.ut.update_password, user, old_password, new_password]) + check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ).run() + return res, check_result + def delete_user(self, user, using="default", check_task=None, check_items=None): func_name = sys._getframe().f_code.co_name res, is_succ = api_request([self.ut.delete_user, user, using]) @@ -265,7 +271,6 @@ class ApiUtilityWrapper: def create_role(self, using="default", check_task=None, check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name res, is_succ = api_request([self.role.create], **kwargs) - res = res if is_succ else None check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, **kwargs).run() return res, check_result diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index eb704493d260d3ad7437bd0f1ef8ede49c5cca89..b9f05930f5bf4d05c88681ede2ae420c258a73c2 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -80,6 +80,10 @@ class ResponseChecker: # Collection interface response check result = self.check_role_property(self.response, self.func_name, self.check_items) + elif self.check_task == CheckTasks.check_permission_deny: + # Collection interface response check + result = self.check_permission_deny(self.response, self.succ) + # Add check_items here if something new need verify return result @@ -390,3 +394,13 @@ class ResponseChecker: if check_items.get("name", None): assert role.name == check_items["name"] return True + + @staticmethod + def check_permission_deny(res, actual=True): + assert actual is False + if isinstance(res, Error): + assert "permission deny" in res.message + else: + log.error("[CheckFunc] Response of API is not an error: %s" % str(res)) + assert False + return True diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 22f17e0b19cad57134219bd2e224bf240a749256..73e27a373221bfd5aed5b88d86f2b59731af9fa5 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -193,6 +193,7 @@ class CheckTasks: check_delete_compact = "check_delete_compact" check_merge_compact = "check_merge_compact" check_role_property = "check_role_property" + check_permission_deny = "check_permission_deny" class BulkLoadStates: diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index 860cbed508af9caa1c75f8de37e3a8e1e5021526..c3476397cf5d59a854165aadea9a6c48aa93665d 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -12,6 +12,7 @@ from common import common_type as ct from common.common_type import CaseLabel, CheckTasks from common.milvus_sys import MilvusSys from pymilvus.grpc_gen.common_pb2 import SegmentState +import random prefix = "utility" default_schema = cf.gen_default_collection_schema() @@ -1779,7 +1780,7 @@ class TestUtilityUserPassword(TestcaseBase): """ Test case of user interface """ @pytest.mark.tags(ct.CaseLabel.L3) - def test_create_user_with_user_password(self,host, port): + def test_create_user_with_user_password(self, host, port): """ target: test the user creation with user and password method: create user with the default user and password parameter @@ -1821,21 +1822,21 @@ class TestUtilityUserPassword(TestcaseBase): method: get a list of users expected: list all users """ - #1. default user login + # 1. default user login self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password, check_task=ct.CheckTasks.ccr) - #2. create 2 users + # 2. create 2 users self.utility_wrap.create_user(user="user1", password="abc123") self.utility_wrap.create_user(user="user2", password="abc123") - #3. list all users + # 3. list all users res = self.utility_wrap.list_usernames()[0] assert "user1" and "user2" in res @pytest.mark.tags(ct.CaseLabel.L3) @pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING]) - def test_delete_user_with_username(self,host, port,connect_name): + def test_delete_user_with_username(self, host, port, connect_name): """ target: test deleting user with username method: delete user with username and connect with the wrong user then list collections @@ -2001,6 +2002,7 @@ class TestUtilityInvalidUserPassword(TestcaseBase): check_items={ct.err_code: 31}) +class TestUtilityRBAC(TestcaseBase): @pytest.mark.tags(CaseLabel.L3) def test_clear_roles(self, host, port): """ @@ -2015,6 +2017,12 @@ class TestUtilityInvalidUserPassword(TestcaseBase): user = cf.gen_unique_str(prefix) password = cf.gen_unique_str(prefix) r_name = cf.gen_unique_str(prefix) + + usernames, _ = self.utility_wrap.list_usernames() + for username in usernames: + if username != "root": + self.utility_wrap.delete_user(username) + u, _ = self.utility_wrap.create_user(user=user, password=password) self.utility_wrap.init_role(r_name) @@ -2205,10 +2213,9 @@ class TestUtilityInvalidUserPassword(TestcaseBase): password=password, check_task=ct.CheckTasks.ccr) collection_w = self.init_collection_wrap(name=c_name) data = cf.gen_default_list_data(ct.default_nb) - error = {ct.err_code: 1} - collection_w.insert(data=data, check_task=CheckTasks.err_res, check_items=error) + collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny) collection_w2 = self.init_collection_wrap(name=c_name_2) - collection_w2.insert(data=data, check_task=CheckTasks.err_res, check_items=error) + collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny) # grant user collection insert privilege self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) @@ -2225,17 +2232,48 @@ class TestUtilityInvalidUserPassword(TestcaseBase): collection_w.insert(data=data) # verify grant scope - error = {ct.err_code: 1} index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} collection_w.create_index(ct.default_float_vec_field_name, index_params, - check_task=CheckTasks.err_res, check_items=error) + check_task=CheckTasks.check_permission_deny) collection_w2 = self.init_collection_wrap(name=c_name_2) - collection_w2.insert(data=data, check_task=CheckTasks.err_res, check_items=error) + collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny) + + @pytest.mark.tags(CaseLabel.L3) + def test_revoke_public_role_privilege(self, host, port): + """ + target: revoke public role privilege + method: revoke public role privilege + expected: success to revoke + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + self.init_collection_wrap(name=c_name) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role("public") + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_revoke("Collection", c_name, "Insert") + data = cf.gen_default_list_data(ct.default_nb) + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_wrap(name=c_name) + collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny) + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("public") + self.utility_wrap.role_grant("Collection", c_name, "Insert") + @pytest.mark.tags(CaseLabel.L3) - def test_role_revoke_collection_insert(self, host, port): + def test_role_revoke_collection_privilege(self, host, port): """ - target: test revoke role collection insert privilege, + target: test revoke role collection privilege, method: create role and collection, grant role insert privilege, revoke privilege expected: assert revoke privilege success """ @@ -2275,9 +2313,84 @@ class TestUtilityInvalidUserPassword(TestcaseBase): self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) self.connection_wrap.connect(host=host, port=port, user=user, password=password, check_task=ct.CheckTasks.ccr) - error = {ct.err_code: 1} collection_w = self.init_collection_wrap(name=c_name) - collection_w.insert(data=data, check_task=CheckTasks.err_res, check_items=error) + collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny) + + @pytest.mark.tags(CaseLabel.L3) + def test_role_revoke_global_privilege(self, host, port): + """ + target: test revoke role global privilege, + method: create role, grant role global createcollection privilege, revoke privilege + expected: assert revoke privilege success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + c_name_2 = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_add_user(user) + + # grant user Global CreateCollection privilege + self.utility_wrap.role_grant("Global", "*", "CreateCollection") + + # verify user specific Global CreateCollection privilege + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_wrap(name=c_name) + + # revoke privilege + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role(r_name) + self.utility_wrap.role_revoke("Global", "*", "CreateCollection") + + # verify revoke is success + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_wrap(name=c_name_2, + check_task=CheckTasks.check_permission_deny) + + @pytest.mark.tags(CaseLabel.L3) + def test_role_revoke_user_privilege(self, host, port): + """ + target: test revoke role user privilege, + method: create role, grant role user updateuser privilege, revoke privilege + expected: assert revoke privilege success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_add_user(user) + + # grant user User UpdateUser privilege + self.utility_wrap.role_grant("User", "*", "UpdateUser") + self.utility_wrap.role_revoke("User", "*", "UpdateUser") + + # verify revoke is success + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password, + check_task=CheckTasks.check_permission_deny) @pytest.mark.tags(CaseLabel.L3) def test_role_list_grants(self, host, port): @@ -2309,3 +2422,1218 @@ class TestUtilityInvalidUserPassword(TestcaseBase): # list grants g_list, _ = self.utility_wrap.role_list_grants() assert len(g_list.groups) == len(grant_list) + + @pytest.mark.tags(CaseLabel.L3) + def test_drop_role_which_bind_user(self, host, port): + """ + target: drop role which bind user + method: create a role, bind user to the role, drop the role + expected: drop success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_add_user(user) + + self.utility_wrap.role_drop() + assert not self.utility_wrap.role_is_exist()[0] + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("name", ["admin", "public"]) + def test_add_user_to_default_role(self, name, host, port): + """ + target: add user to admin role or public role + method: create a user,add user to admin role or public role + expected: add success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role(name) + self.utility_wrap.role_add_user(user) + users, _ = self.utility_wrap.role_get_users() + user_info, _ = self.utility_wrap.list_user(user, True) + user_item = user_info.groups[0] + assert name in user_item.roles + assert user in users + + @pytest.mark.tags(CaseLabel.L3) + def test_add_root_to_new_role(self, host, port): + """ + target: add root to new role + method: add root to new role + expected: add success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_add_user("root") + users, _ = self.utility_wrap.role_get_users() + user_info, _ = self.utility_wrap.list_user("root", True) + user_item = user_info.groups[0] + assert r_name in user_item.roles + assert "root" in users + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_list_collection_grands_by_role_and_object(self, host, port): + """ + target: list grants by role and object + method: create a new role,grant role collection privilege,list grants by role and object + expected: list success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_grant("Collection", c_name, "Search") + self.utility_wrap.role_grant("Collection", c_name, "Insert") + + g_list, _ = self.utility_wrap.role_list_grant("Collection", c_name) + assert len(g_list.groups) == 2 + for g in g_list.groups: + assert g.object == "Collection" + assert g.object_name == c_name + assert g.privilege in ["Search", "Insert"] + self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_list_global_grands_by_role_and_object(self, host, port): + """ + target: list grants by role and object + method: create a new role,grant role global privilege,list grants by role and object + expected: list success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_grant("Global", "*", "CreateCollection") + self.utility_wrap.role_grant("Global", "*", "All") + + g_list, _ = self.utility_wrap.role_list_grant("Global", "*") + assert len(g_list.groups) == 2 + for g in g_list.groups: + assert g.object == "Global" + assert g.object_name == "*" + assert g.privilege in ["CreateCollection", "All"] + self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_admin_role_privilege(self, host, port): + """ + target: verify admin role privilege + method: create a new user, bind to admin role, crud collection + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("admin") + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.role_add_user(user) + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_list_data(ct.default_nb) + collection_w.insert(data=data) + collection_w.load() + assert collection_w.num_entities == ct.default_nb + collection_w.release() + collection_w.drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_collection_load_privilege(self, host, port): + """ + target: verify grant collection load privilege + method: verify grant collection load privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Collection", c_name, "Load") + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + collection_w.load() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_collection_release_privilege(self, host, port): + """ + target: verify grant collection release privilege + method: verify grant collection release privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Collection", c_name, "Release") + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + collection_w.load() + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + collection_w.release() + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/19012") + def test_verify_grant_collection_compaction_privilege(self, host, port): + """ + target: verify grant collection compaction privilege + method: verify grant collection compaction privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.role_grant("Collection", c_name, "Compaction") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w.compact() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_collection_insert_privilege(self, host, port): + """ + target: verify grant collection insert privilege + method: verify grant collection insert privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.role_grant("Collection", c_name, "Insert") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_collection_delete_privilege(self, host, port): + """ + target: verify grant collection delete privilege + method: verify grant collection delete privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.role_grant("Collection", c_name, "Delete") + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + tmp_expr = f'{ct.default_int64_field_name} in {[0]}' + collection_w.delete(tmp_expr) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_create_index_privilege(self, host, port): + """ + target: verify grant create index privilege + method: verify grant create index privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.role_grant("Collection", c_name, "CreateIndex") + self.utility_wrap.role_grant("Collection", c_name, "Flush") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + self.index_wrap.init_index(collection_w.collection, ct.default_int64_field_name, + default_index_params) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_drop_index_privilege(self, host, port): + """ + target: verify grant drop index privilege + method: verify grant drop index privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.index_wrap.init_index(collection_w.collection, ct.default_int64_field_name, + default_index_params) + self.utility_wrap.role_grant("Collection", c_name, "DropIndex") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + self.index_wrap.drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_collection_search_privilege(self, host, port): + """ + target: verify grant collection search privilege + method: verify grant collection search privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + collection_w.load() + self.utility_wrap.role_grant("Collection", c_name, "Search") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)] + collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit, + "int64 >= 0", check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, + "limit": ct.default_limit}) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_collection_flush_privilege(self, host, port): + """ + target: verify grant collection flush privilege + method: verify grant collection flush privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.role_grant("Collection", c_name, "Flush") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w.flush() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_collection_query_privilege(self, host, port): + """ + target: verify grant collection query privilege + method: verify grant collection query privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + collection_w.load() + self.utility_wrap.role_grant("Collection", c_name, "Query") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + res, _ = collection_w.query(default_term_expr) + assert len(res) == 2 + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_all_privilege(self, host, port): + """ + target: verify grant global all privilege + method: verify grant global all privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "All") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + collection_w = self.init_collection_wrap(name=c_name) + collection_w.drop() + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + self.utility_wrap.role_add_user(user_test) + self.utility_wrap.role_grant("Collection", c_name, "Insert") + self.utility_wrap.role_revoke("Collection", c_name, "Insert") + self.utility_wrap.role_remove_user(user_test) + + self.utility_wrap.delete_user(user=user_test) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_create_collection_privilege(self, host, port): + """ + target: verify grant global create collection privilege + method: verify grant global create collection privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "CreateCollection") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + self.init_collection_wrap(name=c_name) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_drop_collection_privilege(self, host, port): + """ + target: verify grant global drop collection privilege + method: verify grant global drop collection privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "DropCollection") + collection_w = self.init_collection_wrap(name=c_name) + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + collection_w.drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_create_ownership_privilege(self, host, port): + """ + target: verify grant global create ownership privilege + method: verify grant global create ownership privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "CreateOwnership") + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_drop_ownership_privilege(self, host, port): + """ + target: verify grant global drop ownership privilege + method: verify grant global drop ownership privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "DropOwnership") + + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + self.utility_wrap.role_drop() + self.utility_wrap.delete_user(user=user_test) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_select_ownership_privilege(self, host, port): + """ + target: verify grant global select ownership privilege + method: verify grant global select ownership privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "SelectOwnership") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + self.utility_wrap.list_usernames() + self.utility_wrap.role_list_grants() + self.utility_wrap.list_roles(False) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_global_manage_ownership_privilege(self, host, port): + """ + target: verify grant global manage ownership privilege + method: verify grant global manage ownership privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name) + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Global", "*", "ManageOwnership") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + self.utility_wrap.role_add_user(user_test) + self.utility_wrap.role_remove_user(user_test) + self.utility_wrap.role_grant("Collection", c_name, "Search") + self.utility_wrap.role_revoke("Collection", c_name, "Search") + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_user_update_privilege(self, host, port): + """ + target: verify grant user update privilege + method: verify grant user update privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("User", "*", "UpdateUser") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password) + self.utility_wrap.update_password(user=user_test, old_password=password, new_password=password_test) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_select_user_privilege(self, host, port): + """ + target: verify grant select user privilege + method: verify grant select user privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + r_test = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_test) + self.utility_wrap.create_role() + + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("User", "*", "SelectUser") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + self.utility_wrap.list_user(username=user_test, include_role_info=False) + self.utility_wrap.list_users(include_role_info=False) + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_privilege_with_wildcard_object_name(self, host, port): + """ + target: verify grant privilege with wildcard instead of object name + method: verify grant privilege with wildcard instead of object name + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + c_name_2 = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name) + collection_w2 = self.init_collection_wrap(name=c_name_2) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Collection", "*", "Load") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + collection_w.load() + collection_w2.load() + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_grant_privilege_with_wildcard_privilege(self, host, port): + """ + target: verify grant privilege with wildcard instead of privilege + method: verify grant privilege with wildcard instead of privilege + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + collection_w = self.init_collection_wrap(name=c_name) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + u, _ = self.utility_wrap.create_user(user=user, password=password) + self.utility_wrap.role_add_user(user) + self.utility_wrap.role_grant("Collection", "*", "*") + + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + collection_w.load() + collection_w.release() + collection_w.compact() + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data) + tmp_expr = f'{ct.default_int64_field_name} in {[0]}' + collection_w.delete(tmp_expr) + self.index_wrap.init_index(collection_w.collection, ct.default_int64_field_name, + default_index_params) + self.index_wrap.drop(ct.default_int64_field_name) + vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)] + collection_w.load() + collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit, + "int64 >= 0") + collection_w.flush() + default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + collection_w.query(default_term_expr) + + @pytest.mark.tags(CaseLabel.L3) + def test_new_user_default_owns_public_role_permission(self, host, port): + """ + target: new user owns public role privilege + method: create a role,verify its permission + expected: verify success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user_test = cf.gen_unique_str(prefix) + password_test = cf.gen_unique_str(prefix) + self.utility_wrap.create_user(user=user_test, password=password_test) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + c_name = cf.gen_unique_str(prefix) + c_name_2 = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + collection_w = self.init_collection_wrap(name=c_name) + _, _ = self.index_wrap.init_index(collection_w.collection, default_field_name, default_index_params) + self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) + self.connection_wrap.connect(host=host, port=port, user=user, + password=password, check_task=ct.CheckTasks.ccr) + + # Collection permission deny + collection_w.load(check_task=CheckTasks.check_permission_deny) + collection_w.release(check_task=CheckTasks.check_permission_deny) + collection_w.compact(check_task=CheckTasks.check_permission_deny) + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny) + tmp_expr = f'{ct.default_int64_field_name} in {[0]}' + collection_w.delete(tmp_expr, check_task=CheckTasks.check_permission_deny) + self.index_wrap.drop(ct.default_int64_field_name, check_task=CheckTasks.check_permission_deny) + self.index_wrap.init_index(collection_w.collection, ct.default_int64_field_name, + default_index_params, check_task=CheckTasks.check_permission_deny) + vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)] + collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit, + "int64 >= 0", check_task=CheckTasks.check_permission_deny) + collection_w.flush(check_task=CheckTasks.check_permission_deny) + default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + collection_w.query(default_term_expr, check_task=CheckTasks.check_permission_deny) + # self.utility_wrap.bulk_load(c_name, check_task=CheckTasks.check_permission_deny) + + # Global permission deny + self.init_collection_wrap(name=c_name_2, check_task=CheckTasks.check_permission_deny) + collection_w.drop(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.create_user(user=c_name, password=password, check_task=CheckTasks.check_permission_deny) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.delete_user(user=user, check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_drop(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.list_usernames(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_list_grants(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.list_roles(False, check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_add_user(user, check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_remove_user(user, check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_grant("Collection", c_name, "Insert", check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_revoke("Collection", c_name, "Insert", check_task=CheckTasks.check_permission_deny) + + # User permission deny + self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password, + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.update_password(user=user_test, old_password=password, new_password=password_test, + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.list_user(user_test, False, check_task=CheckTasks.check_permission_deny) + + # public role access + collection_w.index() + self.utility_wrap.list_collections() + self.utility_wrap.has_collection(c_name) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("name", ["admin", "public"]) + def test_remove_user_from_default_role(self, name, host, port): + """ + target: remove user from admin role or public role + method: create a user,add user to admin role or public role,remove user from role + expected: remove success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role(name) + self.utility_wrap.role_add_user(user) + users, _ = self.utility_wrap.role_get_users() + user_info, _ = self.utility_wrap.list_user(user, True) + user_item = user_info.groups[0] + assert name in user_item.roles + assert user in users + + self.utility_wrap.role_remove_user(user) + users, _ = self.utility_wrap.role_get_users() + assert user not in users + + @pytest.mark.tags(CaseLabel.L3) + def test_remove_root_from_new_role(self, host, port): + """ + target: remove root from new role + method: create a new role, bind root to role,remove root from role + expected: remove success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + assert self.utility_wrap.role_is_exist()[0] + self.utility_wrap.role_add_user("root") + users, _ = self.utility_wrap.role_get_users() + user_info, _ = self.utility_wrap.list_user("root", True) + user_item = user_info.groups[0] + assert r_name in user_item.roles + assert "root" in users + + self.utility_wrap.role_remove_user("root") + users, _ = self.utility_wrap.role_get_users() + assert "root" not in users + self.utility_wrap.role_drop() + + +class TestUtilityNegativeRbac(TestcaseBase): + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglong", + "n%$#@!", "123n", " ", "''", "test-role", "ff ff", "中文"]) + def test_create_role_with_invalid_name(self, name, host, port): + """ + target: create role with invalid name + method: create role with invalid name + expected: create fail + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role(name) + + error = {"err_code": 5} + self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error) + # get roles + role_groups, _ = self.utility_wrap.list_roles(False) + + # drop roles + for role_group in role_groups.groups: + if role_group.role_name not in ['admin', 'public']: + self.utility_wrap.init_role(role_group.role_name) + g_list, _ = self.utility_wrap.role_list_grants() + for g in g_list.groups: + self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege) + self.utility_wrap.role_drop() + role_groups, _ = self.utility_wrap.list_roles(False) + assert len(role_groups.groups) == 2 + + @pytest.mark.tags(CaseLabel.L3) + def test_create_exist_role(self, host, port): + """ + target: check create an exist role fail + method: double create a role with same name + expected: fail to create + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + assert self.utility_wrap.role_is_exist()[0] + error = {"err_code": 35, + "err_msg": "fail to create role"} + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_drop() + assert not self.utility_wrap.role_is_exist()[0] + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("name", ["admin", "public"]) + def test_drop_admin_and_public_role(self, name, host, port): + """ + target: drop admin and public role fail + method: drop admin and public role fail + expected: fail to drop + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(name) + assert self.utility_wrap.role_is_exist()[0] + error = {"err_code": 5, + "err_msg": "the role[%s] is a default role, which can\'t be dropped" % name} + self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error) + assert self.utility_wrap.role_is_exist()[0] + + @pytest.mark.tags(CaseLabel.L3) + def test_drop_role_which_not_exist(self, host, port): + """ + target: drop role which not exist fail + method: drop role which not exist + expected: fail to drop + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + assert not self.utility_wrap.role_is_exist()[0] + error = {"err_code": 36, + "err_msg": "the role isn\'t existed"} + self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_add_user_not_exist_role(self, host, port): + """ + target: add user to not exist role + method: create a user,add user to not exist role + expected: fail to add + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + + self.utility_wrap.init_role(r_name) + assert not self.utility_wrap.role_is_exist()[0] + + error = {"err_code": 37, + "err_msg": "fail to check the role name"} + self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_add_not_exist_user_to_role(self, host, port): + """ + target: add not exist user to role + method: create a role,add not exist user to role + expected: fail to add + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + user = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + assert self.utility_wrap.role_is_exist()[0] + + error = {"err_code": 37, + "err_msg": "fail to check the username"} + self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("name", ["admin", "public"]) + def test_remove_root_from_default_role(self, name, host, port): + """ + target: remove root from admin role or public role + method: remove root from admin role or public role + expected: remove success + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + self.utility_wrap.init_role(name) + error = {"err_code": 37, + "err_msg": "fail to operate user to role"} + self.utility_wrap.role_remove_user("root", check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_remove_user_from_unbind_role(self, host, port): + """ + target: remove user from unbind role + method: create new role and new user, remove user from unbind role + expected: fail to remove + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + assert self.utility_wrap.role_is_exist()[0] + + error = {"err_code": 37, + "err_msg": "fail to operate user to role"} + self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_remove_user_from_empty_role(self, host, port): + """ + target: remove not exist user from role + method: create new role, remove not exist user from unbind role + expected: fail to remove + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + user = cf.gen_unique_str(prefix) + password = cf.gen_unique_str(prefix) + u, _ = self.utility_wrap.create_user(user=user, password=password) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + assert not self.utility_wrap.role_is_exist()[0] + + error = {"err_code": 37, + "err_msg": "fail to check the role name"} + self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error) + users, _ = self.utility_wrap.role_get_users() + assert user not in users + + @pytest.mark.tags(CaseLabel.L3) + def test_remove_not_exist_user_from_role(self, host, port): + """ + target: remove not exist user from role + method: create new role, remove not exist user from unbind role + expected: fail to remove + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + + user = cf.gen_unique_str(prefix) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + assert self.utility_wrap.role_is_exist()[0] + + error = {"err_code": 37, + "err_msg": "fail to check the username"} + self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error) + users, _ = self.utility_wrap.role_get_users() + assert user not in users + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_drop_role_with_bind_privilege(self, host, port): + """ + target: drop role with bind privilege + method: create a new role,grant role privilege,drop it + expected: fail to drop + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + self.utility_wrap.role_grant("Collection", "*", "*") + + error = {"err_code": 36, + "err_msg": "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges"} + self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_list_grant_by_not_exist_role(self, host, port): + """ + target: list grants by not exist role + method: list grants by not exist role + expected: fail to list + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + error = {"err_code": 42, + "err_msg": "there is no value on key = by-dev/meta/root-coord/credential/roles/%s" % r_name} + self.utility_wrap.role_list_grants(check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_list_grant_by_role_and_not_exist_object(self, host, port): + """ + target: list grants by role and not exist object + method: list grants by role and not exist object + expected: fail to list + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + o_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + error = {"err_code": 41, + "err_msg": "the object type in the object entity[name: %s] is invalid" % o_name} + self.utility_wrap.role_list_grant(o_name, "*", check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_drop() + + @pytest.mark.tags(CaseLabel.L3) + def test_grant_privilege_with_object_not_exist(self, host, port): + """ + target: grant privilege with not exist object + method: grant privilege with not exist object + expected: fail to grant + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + o_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + error = {"err_code": 41, + "err_msg": "the object type in the object entity[name: %s] is invalid" % o_name} + self.utility_wrap.role_grant(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_grant_privilege_with_privilege_not_exist(self, host, port): + """ + target: grant privilege with not exist privilege + method: grant privilege with not exist privilege + expected: fail to grant + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + p_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name} + self.utility_wrap.role_grant("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_revoke_privilege_with_object_not_exist(self, host, port): + """ + target: revoke privilege with not exist object + method: revoke privilege with not exist object + expected: fail to revoke + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + o_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + error = {"err_code": 41, + "err_msg": "the object type in the object entity[name: %s] is invalid" % o_name} + self.utility_wrap.role_revoke(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L3) + def test_revoke_privilege_with_privilege_not_exist(self, host, port): + """ + target: revoke privilege with not exist privilege + method: revoke privilege with not exist privilege + expected: fail to revoke + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + r_name = cf.gen_unique_str(prefix) + p_name = cf.gen_unique_str(prefix) + self.utility_wrap.init_role(r_name) + self.utility_wrap.create_role() + error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name} + self.utility_wrap.role_revoke("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)