From 05e6143a2138dd5d846cd299ad27b3cf0449c409 Mon Sep 17 00:00:00 2001 From: Kirill Sizov Date: Thu, 7 Jul 2022 11:35:43 +0300 Subject: [PATCH] REST API tests: project backup and patch labels (#104) --- tests/rest_api/fixtures/data.py | 8 ++ tests/rest_api/test_projects.py | 208 +++++++++++++++++++++++++++++++- 2 files changed, 215 insertions(+), 1 deletion(-) diff --git a/tests/rest_api/fixtures/data.py b/tests/rest_api/fixtures/data.py index 7a84f8906..4dcb6220b 100644 --- a/tests/rest_api/fixtures/data.py +++ b/tests/rest_api/fixtures/data.py @@ -95,6 +95,14 @@ def jobs_by_org(tasks, jobs): data[''] = data.pop(None, []) return data +@pytest.fixture(scope='session') +def projects_by_org(projects): + data = {} + for project in projects: + data.setdefault(project['organization'], []).append(project) + data[''] = data.pop(None, []) + return data + @pytest.fixture(scope='session') def tasks_by_org(tasks): data = {} diff --git a/tests/rest_api/test_projects.py b/tests/rest_api/test_projects.py index 48356f4ee..331f92782 100644 --- a/tests/rest_api/test_projects.py +++ b/tests/rest_api/test_projects.py @@ -6,9 +6,14 @@ import tempfile from http import HTTPStatus from itertools import groupby, product +from time import sleep + import pytest +from copy import deepcopy +from deepdiff import DeepDiff -from .utils.config import get_method, post_files_method, post_method +from .utils.config import (get_method, patch_method, post_files_method, + post_method) @pytest.mark.usefixtures('dontchangedb') @@ -113,6 +118,109 @@ class TestGetProjects: self._test_response_200(user_in_project['username'], project_id, org_id=user_in_project['org']) +class TestGetProjectBackup: + def _test_can_get_project_backup(self, username, pid, **kwargs): + for _ in range(30): + response = get_method(username, f"projects/{pid}/backup", **kwargs) + response.raise_for_status() + if response.status_code == HTTPStatus.CREATED: + break + sleep(1) + response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs) + assert response.status_code == HTTPStatus.OK + + def _test_cannot_get_project_backup(self, username, pid, **kwargs): + response = get_method(username, f"projects/{pid}/backup", **kwargs) + assert response.status_code == HTTPStatus.FORBIDDEN + + def test_admin_can_get_project_backup(self, projects): + project = list(projects)[0] + self._test_can_get_project_backup('admin1', project['id']) + + # User that not in [project:owner, project:assignee] cannot get project backup. + def test_user_cannot_get_project_backup(self, find_users, projects, is_project_staff): + users = find_users(exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) + ) + + self._test_cannot_get_project_backup(user['username'], project['id']) + + # Org worker that not in [project:owner, project:assignee] cannot get project backup. + def test_org_worker_cannot_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='worker', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_cannot_get_project_backup(user['username'], project['id'], org_id=project['organization']) + + # Org worker that in [project:owner, project:assignee] can get project backup. + def test_org_worker_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='worker', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization']) + + # Org supervisor that in [project:owner, project:assignee] can get project backup. + def test_org_supervisor_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='supervisor', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization']) + + # Org supervisor that not in [project:owner, project:assignee] cannot get project backup. + def test_org_supervisor_cannot_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='supervisor', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_cannot_get_project_backup(user['username'], project['id'], org_id=project['organization']) + + # Org maintainer that not in [project:owner, project:assignee] can get project backup. + def test_org_maintainer_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='maintainer', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization']) + + # Org owner that not in [project:owner, project:assignee] can get project backup. + def test_org_owner_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='owner', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization']) + @pytest.mark.usefixtures('changedb') class TestPostProjects: def _test_create_project_201(self, user, spec, **kwargs): @@ -243,3 +351,101 @@ class TestImportExportDatasetProject: } self._test_import_project(username, project_id, 'CVAT 1.1', import_data) + +@pytest.mark.usefixtures('changedb') +class TestPatchProjectLabel: + def test_admin_can_delete_label(self, projects): + project = deepcopy(list(projects)[0]) + labels = project['labels'][0] + labels.update({'deleted': True}) + response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]}) + assert response.status_code == HTTPStatus.OK + assert len(response.json()['labels']) == len(project['labels']) - 1 + + def test_admin_can_rename_label(self, projects): + project = deepcopy(list(projects)[0]) + labels = project['labels'][0] + labels.update({'name': 'new name'}) + response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]}) + assert response.status_code == HTTPStatus.OK + assert DeepDiff(response.json()['labels'], project['labels'], ignore_order=True) == {} + + def test_admin_can_add_label(self, projects): + project = list(projects)[0] + labels = {'name': 'new name'} + response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]}) + assert response.status_code == HTTPStatus.OK + assert len(response.json()['labels']) == len(project['labels']) + 1 + + # Org maintainer can add label even he is not in [project:owner, project:assignee] + def test_org_maintainer_can_add_label(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='maintainer', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + labels = {'name': 'new name'} + response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization']) + assert response.status_code == HTTPStatus.OK + assert len(response.json()['labels']) == len(project['labels']) + 1 + + # Org supervisor cannot add label + def test_org_supervisor_can_add_label(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='supervisor', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + labels = {'name': 'new name'} + response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization']) + assert response.status_code == HTTPStatus.FORBIDDEN + + # Org worker cannot add label + def test_org_worker_cannot_add_label(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='worker', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + labels = {'name': 'new name'} + response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization']) + assert response.status_code == HTTPStatus.FORBIDDEN + + # Org worker that in [project:owner, project:assignee] can add label + def test_org_worker_can_add_label(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='worker', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + labels = {'name': 'new name'} + response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization']) + assert response.status_code == HTTPStatus.OK + assert len(response.json()['labels']) == len(project['labels']) + 1 + + # Org owner can add label even he is not in [project:owner, project:assignee] + def test_org_owner_can_add_label(self, find_users, projects, is_project_staff, is_org_member): + users = find_users(role='owner', exclude_privilege='admin') + + user, project = next( + (user, project) + for user, project in product(users, projects) + if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization']) + ) + + labels = {'name': 'new name'} + response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization']) + assert response.status_code == HTTPStatus.OK + assert len(response.json()['labels']) == len(project['labels']) + 1 -- GitLab