未验证 提交 f7fd06c6 编写于 作者: M Maria Khrustaleva 提交者: GitHub

Fixed resources import (#5909)

Fixed:
- wrong location of tmp file when importing job annotations
- ```Traceback (most recent call last):
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 795, in work
      self.execute_job(job, queue)
File "/home/maya/Documents/cvat/cvat/rqworker.py", line 37, in
execute_job
      return self.perform_job(*args, **kwargs)
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 1389, in perform_job
      self.handle_exception(job, *exc_info)
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 1438, in handle_exception
      fallthrough = handler(job, *exc_info)
File "/home/maya/Documents/cvat/cvat/apps/engine/views.py", line 2233,
in rq_exception_handler
      rq_job.exc_info = "".join(
  AttributeError: can't set attribute
  ```

Resolves https://github.com/opencv/cvat/issues/5773
Resolves https://github.com/opencv/cvat/issues/5563

- root causes of the issues: 
  - the annotation file was uploaded to the server by tus protocol and
rq job was created but no one next requests for checking status were not
made. (e.g. user closed the browser tab)
  - the annotation file was uploaded to the server by tus protocol but
rq job has not yet been created (e.g cvat instance restarted)
  - tasks/projects creation from backups with the same name at the
same time by different users
Co-authored-by: NRoman Donchenko <roman@cvat.ai>
Co-authored-by: NMaxim Zhiltsov <zhiltsov.max35@gmail.com>
上级 2584b963
......@@ -270,6 +270,28 @@
"env": {},
"console": "internalConsole"
},
{
"name": "server: RQ - cleaning",
"type": "python",
"request": "launch",
"stopOnEntry": false,
"justMyCode": false,
"python": "${command:python.interpreterPath}",
"program": "${workspaceRoot}/manage.py",
"args": [
"rqworker",
"cleaning",
"--worker-class",
"cvat.rqworker.SimpleWorker"
],
"django": true,
"cwd": "${workspaceFolder}",
"env": {
"DJANGO_LOG_SERVER_HOST": "localhost",
"DJANGO_LOG_SERVER_PORT": "8282"
},
"console": "internalConsole"
},
{
"name": "server: migrate",
"type": "python",
......@@ -433,6 +455,7 @@
"server: RQ - annotation",
"server: RQ - webhooks",
"server: RQ - scheduler",
"server: RQ - cleaning",
"server: git",
]
}
......
......@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support task creation with any type of data supported by the server by default from cloud storage
without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
- Support task creation with cloud storage data and without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
- Cleaning worker to check that the uploaded resource has been deleted or delete otherwise (<https://github.com/opencv/cvat/pull/5909>)
### Changed
- Resource links are opened from any organization/sandbox if available for user (<https://github.com/opencv/cvat/pull/5892>)
......@@ -31,6 +32,12 @@ without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
### Fixed
- Skeletons dumping on created tasks/projects (<https://github.com/opencv/cvat/pull/6157>)
- Fix saving annotations for skeleton tracks (<https://github.com/opencv/cvat/pull/6075>)
- Wrong location of tmp file when importing job annotations (<https://github.com/opencv/cvat/pull/5909>)
- Removing uploaded file with annotations/backups when rq job was created
but no next requests for checking status were not made (<https://github.com/opencv/cvat/pull/5909>)
- Removing uploaded file with annotations/backups after file was uploaded to the server by tus protocol
but rq job has not yet been created (<https://github.com/opencv/cvat/pull/5909>)
- Tasks/projects creation from backups with the same name at the same time by different users (<https://github.com/opencv/cvat/pull/5909>)
### Security
- TDB
......
......@@ -783,13 +783,14 @@ async function importDataset(
};
const url = `${backendAPI}/projects/${id}/dataset`;
let rqId: string;
async function wait() {
return new Promise<void>((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.get(url, {
params: { ...params, action: 'import_status' },
params: { ...params, action: 'import_status', rq_id: rqId },
});
if (response.status === 202) {
if (response.data.message) {
......@@ -812,10 +813,11 @@ async function importDataset(
if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
......@@ -837,11 +839,12 @@ async function importDataset(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
......@@ -1617,6 +1620,7 @@ async function uploadAnnotations(
filename: typeof file === 'string' ? file : file.name,
conv_mask_to_poly: options.convMaskToPoly,
};
let rqId: string;
const url = `${backendAPI}/${session}s/${id}/annotations`;
async function wait() {
......@@ -1627,7 +1631,7 @@ async function uploadAnnotations(
url,
new FormData(),
{
params,
params: { ...params, rq_id: rqId },
},
);
if (response.status === 202) {
......@@ -1646,10 +1650,11 @@ async function uploadAnnotations(
if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
......@@ -1667,11 +1672,12 @@ async function uploadAnnotations(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
......
......@@ -450,6 +450,6 @@ class TasksRepo(
)
task_id = json.loads(response.data)["id"]
self._client.logger.info(f"Task has been imported sucessfully. Task ID: {task_id}")
self._client.logger.info(f"Task has been imported successfully. Task ID: {task_id}")
return self.retrieve(task_id)
......@@ -4,6 +4,7 @@
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple
......@@ -89,6 +90,7 @@ class _MyTusUploader(_TusUploader):
headers["upload-length"] = str(self.file_size)
headers["upload-metadata"] = ",".join(self.encode_metadata())
resp = self._api_client.rest_client.POST(self.client.url, headers=headers)
self.real_filename = resp.headers.get("Upload-Filename")
url = resp.headers.get("location")
if url is None:
msg = "Attempt to retrieve create file url with status {}".format(resp.status_code)
......@@ -179,9 +181,10 @@ class Uploader:
assert meta["filename"]
self._tus_start_upload(url, query_params=query_params)
self._upload_file_data_with_tus(
real_filename = self._upload_file_data_with_tus(
url=url, filename=filename, meta=meta, pbar=pbar, logger=logger
)
query_params["filename"] = real_filename
return self._tus_finish_upload(url, query_params=query_params, fields=fields)
def _wait_for_completion(
......@@ -216,7 +219,9 @@ class Uploader:
return _MyTusUploader(client=client, api_client=api_client, **kwargs)
def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None):
def _upload_file_data_with_tus(
self, url, filename, *, meta=None, pbar=None, logger=None
) -> str:
file_size = filename.stat().st_size
if pbar is None:
pbar = NullProgressReporter()
......@@ -233,6 +238,7 @@ class Uploader:
log_func=logger,
)
tus_uploader.upload()
return tus_uploader.real_filename
def _tus_start_upload(self, url, *, query_params=None):
response = self._client.api_client.rest_client.POST(
......@@ -273,17 +279,21 @@ class AnnotationUploader(Uploader):
):
url = self._client.api_map.make_endpoint_url(endpoint.path, kwsub=url_params)
params = {"format": format_name, "filename": filename.name}
self.upload_file(
response = self.upload_file(
url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]}
)
rq_id = json.loads(response.data).get("rq_id")
assert rq_id, "The rq_id was not found in the response"
params["rq_id"] = rq_id
self._wait_for_completion(
url,
success_status=201,
positive_statuses=[202],
status_check_period=status_check_period,
query_params=params,
method="POST",
method="PUT",
)
......@@ -301,12 +311,17 @@ class DatasetUploader(Uploader):
):
url = self._client.api_map.make_endpoint_url(upload_endpoint.path, kwsub=url_params)
params = {"format": format_name, "filename": filename.name}
self.upload_file(
response = self.upload_file(
url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]}
)
rq_id = json.loads(response.data).get("rq_id")
assert rq_id, "The rq_id was not found in the response"
url = self._client.api_map.make_endpoint_url(retrieve_endpoint.path, kwsub=url_params)
params = {"action": "import_status"}
params = {
"action": "import_status",
"rq_id": rq_id,
}
self._wait_for_completion(
url,
success_status=201,
......
......@@ -7,6 +7,7 @@ import os
from tempfile import TemporaryDirectory
import rq
from typing import Any, Callable, List, Mapping, Tuple
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError
from django.db import transaction
......@@ -16,7 +17,7 @@ from cvat.apps.engine.task import _create_thread as create_task
from cvat.apps.dataset_manager.task import TaskAnnotation
from .annotation import AnnotationIR
from .bindings import ProjectData, load_dataset_data
from .bindings import ProjectData, load_dataset_data, CvatImportError
from .formats.registry import make_exporter, make_importer
def export_project(project_id, dst_file, format_name,
......@@ -160,7 +161,7 @@ class ProjectAnnotationAndData:
raise NotImplementedError()
@transaction.atomic
def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_to_poly):
def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly):
rq_job = rq.get_current_job()
rq_job.meta['status'] = 'Dataset import has been started...'
rq_job.meta['progress'] = 0.
......@@ -170,5 +171,8 @@ def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_t
project.init_from_db()
importer = make_importer(format_name)
with open(dataset_file, 'rb') as f:
project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly)
with open(src_file, 'rb') as f:
try:
project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))
......@@ -8,6 +8,7 @@ from collections import OrderedDict
from copy import deepcopy
from enum import Enum
from tempfile import TemporaryDirectory
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError
from django.db import transaction
from django.db.models.query import Prefetch
......@@ -19,11 +20,10 @@ from cvat.apps.engine.plugins import plugin_decorator
from cvat.apps.profiler import silk_profile
from .annotation import AnnotationIR, AnnotationManager
from .bindings import JobData, TaskData
from .bindings import JobData, TaskData, CvatImportError
from .formats.registry import make_exporter, make_importer
from .util import bulk_create
class dotdict(OrderedDict):
"""dot.notation access to dictionary attributes"""
__getattr__ = OrderedDict.get
......@@ -853,19 +853,25 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal
task.export(f, exporter, host=server_url, save_images=save_images)
@transaction.atomic
def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly):
def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly):
task = TaskAnnotation(task_id)
task.init_from_db()
importer = make_importer(format_name)
with open(src_file, 'rb') as f:
task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
try:
task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))
@transaction.atomic
def import_job_annotations(job_id, src_file, format_name, conv_mask_to_poly):
def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly):
job = JobAnnotation(job_id)
job.init_from_db()
importer = make_importer(format_name)
with open(src_file, 'rb') as f:
job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
try:
job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))
......@@ -923,8 +923,7 @@ class TaskAnnotationsImportTest(_DbTestBase):
expected_ann = TaskAnnotation(task["id"])
expected_ann.init_from_db()
dm.task.import_task_annotations(task["id"],
file_path, import_format, True)
dm.task.import_task_annotations(file_path, task["id"], import_format, True)
actual_ann = TaskAnnotation(task["id"])
actual_ann.init_from_db()
......@@ -976,6 +975,6 @@ class TaskAnnotationsImportTest(_DbTestBase):
task.update()
task = self._create_task(task, images)
dm.task.import_task_annotations(task['id'], dataset_path, format_name, True)
dm.task.import_task_annotations(dataset_path, task['id'], format_name, True)
self._test_can_import_annotations(task, format_name)
......@@ -44,7 +44,6 @@ TASK_CACHE_TTL = DEFAULT_CACHE_TTL
PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3
JOB_CACHE_TTL = DEFAULT_CACHE_TTL
def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
try:
if task_id is not None:
......
......@@ -33,8 +33,10 @@ from cvat.apps.engine import models
from cvat.apps.engine.log import slogger
from cvat.apps.engine.serializers import (AttributeSerializer, DataSerializer, LabelSerializer,
LabeledDataSerializer, SegmentSerializer, SimpleJobSerializer, TaskReadSerializer,
ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer)
from cvat.apps.engine.utils import av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta
ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer, RqIdSerializer)
from cvat.apps.engine.utils import (
av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after
)
from cvat.apps.engine.models import (
StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location)
from cvat.apps.engine.task import JobFileMapping, _create_thread
......@@ -47,7 +49,6 @@ from cvat.apps.dataset_manager.bindings import CvatImportError
class Version(Enum):
V1 = '1.0'
def _get_label_mapping(db_labels):
label_mapping = {db_label.id: db_label.name for db_label in db_labels}
for db_label in db_labels:
......@@ -869,7 +870,7 @@ def export(db_instance, request, queue_name):
if os.path.exists(file_path):
return Response(status=status.HTTP_201_CREATED)
elif rq_job.is_failed:
exc_info = str(rq_job.exc_info)
exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info))
rq_job.delete()
return Response(exc_info,
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
......@@ -896,6 +897,9 @@ def _download_file_from_bucket(db_storage, filename, key):
def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None):
rq_job = queue.fetch_job(rq_id)
if (user_id_from_meta := getattr(rq_job, 'meta', {}).get('user', {}).get('id')) and user_id_from_meta != request.user.id:
return Response(status=status.HTTP_403_FORBIDDEN)
if not rq_job:
org_id = getattr(request.iam_context['organization'], 'id', None)
dependent_job = None
......@@ -939,22 +943,25 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati
filename=filename,
key=key,
request=request,
result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(),
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)
rq_job = queue.enqueue_call(
func=importer,
args=(filename, request.user.id, org_id),
func=import_resource_with_clean_up_after,
args=(importer, filename, request.user.id, org_id),
job_id=rq_id,
meta={
'tmp_file': filename,
**get_rq_job_meta(request=request, db_obj=None)
},
depends_on=dependent_job
depends_on=dependent_job,
result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(),
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)
else:
if rq_job.is_finished:
project_id = rq_job.return_value
os.remove(rq_job.meta['tmp_file'])
rq_job.delete()
return Response({'id': project_id}, status=status.HTTP_201_CREATED)
elif rq_job.is_failed or \
......@@ -971,7 +978,10 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati
return Response(data=exc_info,
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response({'rq_id': rq_id}, status=status.HTTP_202_ACCEPTED)
serializer = RqIdSerializer(data={'rq_id': rq_id})
serializer.is_valid(raise_exception=True)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def get_backup_dirname():
return settings.TMP_FILES_ROOT
......@@ -980,7 +990,7 @@ def import_project(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}"
rq_id = get_import_rq_id('project', uuid.uuid4(), 'backup', request.user)
Serializer = ProjectFileSerializer
file_field_name = 'project_file'
......@@ -1003,10 +1013,8 @@ def import_project(request, queue_name, filename=None):
)
def import_task(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}"
rq_id = request.data.get('rq_id', get_import_rq_id('task', uuid.uuid4(), 'backup', request.user))
Serializer = TaskFileSerializer
file_field_name = 'task_file'
......
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
from pathlib import Path
from time import time
from django.conf import settings
from cvat.apps.engine.log import slogger
def clear_import_cache(path: Path, creation_time: float) -> None:
"""
This function checks and removes the import files if they have not been removed from rq import jobs.
This means that for some reason file was uploaded to CVAT server but rq import job was not created.
Args:
path (Path): path to file
creation_time (float): file creation time
"""
if path.is_file() and (time() - creation_time + 1) >= settings.IMPORT_CACHE_CLEAN_DELAY.total_seconds():
path.unlink()
slogger.glob.warning(f"The file {str(path)} was removed from cleaning job.")
......@@ -5,7 +5,7 @@
from enum import Enum
from typing import Any, Dict
from cvat.apps.engine.models import Location
from cvat.apps.engine.models import Location, Job
class StorageType(str, Enum):
TARGET = 'target_storage'
......@@ -20,7 +20,7 @@ def get_location_configuration(obj, field_name: str, use_settings: bool = False)
}
if use_settings:
storage = getattr(obj, field_name)
storage = getattr(obj, field_name) if not isinstance(obj, Job) else getattr(obj.segment.task, field_name)
if storage is None:
location_conf['location'] = Location.LOCAL
else:
......
......@@ -9,15 +9,21 @@ import os
import uuid
from dataclasses import asdict, dataclass
from distutils.util import strtobool
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest import mock
import django_rq
from django.conf import settings
from rest_framework import mixins, status
from rest_framework.response import Response
from cvat.apps.engine.location import StorageType, get_location_configuration
from cvat.apps.engine.log import slogger
from cvat.apps.engine.models import Location
from cvat.apps.engine.serializers import DataSerializer
from cvat.apps.engine.handlers import clear_import_cache
from cvat.apps.engine.utils import get_import_rq_id
class TusFile:
......@@ -221,7 +227,27 @@ class UploadMixin:
if message_id:
metadata["message_id"] = base64.b64decode(message_id)
file_exists = os.path.lexists(os.path.join(self.get_upload_dir(), filename))
import_type = request.path.strip('/').split('/')[-1]
if import_type == 'backup':
# we need to create unique temp file here because
# users can try to import backups with the same name at the same time
with NamedTemporaryFile(prefix=f'cvat-backup-{filename}-by-{request.user}', suffix='.zip', dir=self.get_upload_dir()) as tmp_file:
filename = os.path.relpath(tmp_file.name, self.get_upload_dir())
metadata['filename'] = filename
file_path = os.path.join(self.get_upload_dir(), filename)
file_exists = os.path.lexists(file_path) and import_type != 'backup'
if file_exists:
# check whether the rq_job is in progress or has been finished/failed
object_class_name = self._object.__class__.__name__.lower()
template = get_import_rq_id(object_class_name, self._object.pk, import_type, request.user)
queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
finished_job_ids = queue.finished_job_registry.get_job_ids()
failed_job_ids = queue.failed_job_registry.get_job_ids()
if template in finished_job_ids or template in failed_job_ids:
os.remove(file_path)
file_exists = False
if file_exists:
return self._tus_response(status=status.HTTP_409_CONFLICT,
data="File with same name already exists")
......@@ -231,11 +257,27 @@ class UploadMixin:
return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
data="File size exceeds max limit of {} bytes".format(self._tus_max_file_size))
tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir())
location = request.build_absolute_uri()
if 'HTTP_X_FORWARDED_HOST' not in request.META:
location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')
if import_type in ('backup', 'annotations', 'datasets'):
scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.CLEANING.value)
path = Path(self.get_upload_dir()) / tus_file.filename
cleaning_job = scheduler.enqueue_in(time_delta=settings.IMPORT_CACHE_CLEAN_DELAY,
func=clear_import_cache,
path=path,
creation_time=Path(tus_file.file_path).stat().st_ctime
)
slogger.glob.info(
f'The cleaning job {cleaning_job.id} is queued.'
f'The check that the file {path} is deleted will be carried out after '
f'{settings.IMPORT_CACHE_CLEAN_DELAY}.'
)
return self._tus_response(
status=status.HTTP_201_CREATED,
extra_headers={'Location': '{}{}'.format(location, tus_file.file_id),
......@@ -330,7 +372,7 @@ class AnnotationMixin:
data = get_data(self._object.pk)
return Response(data)
def import_annotations(self, request, db_obj, import_func, rq_func, rq_id):
def import_annotations(self, request, db_obj, import_func, rq_func, rq_id_template):
is_tus_request = request.headers.get('Upload-Length', None) is not None or \
request.method == 'OPTIONS'
if is_tus_request:
......@@ -352,7 +394,7 @@ class AnnotationMixin:
return import_func(
request=request,
rq_id=rq_id,
rq_id_template=rq_id_template,
rq_func=rq_func,
db_obj=self._object,
format_name=format_name,
......
......@@ -22,12 +22,10 @@ from cvat.apps.dataset_manager.formats.utils import get_label_color
from cvat.apps.engine import models
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status
from cvat.apps.engine.log import slogger
from cvat.apps.engine.utils import parse_specific_attributes
from cvat.apps.engine.utils import parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse
from drf_spectacular.utils import OpenApiExample, extend_schema_field, extend_schema_serializer
from cvat.apps.engine.utils import build_field_filter_params, get_list_view_name, reverse
class WriteOnceMixin:
"""
......@@ -667,6 +665,9 @@ class RqStatusSerializer(serializers.Serializer):
message = serializers.CharField(allow_blank=True, default="")
progress = serializers.FloatField(max_value=100, default=0)
class RqIdSerializer(serializers.Serializer):
rq_id = serializers.CharField()
class JobFiles(serializers.ListField):
"""
......
......@@ -1836,9 +1836,9 @@ class ProjectImportExportAPITestCase(ApiTestBase):
response = self.client.post("/api/projects/{}/dataset?format={}".format(pid, f), data=data, format="multipart")
return response
def _run_api_v2_projects_id_dataset_import_status(self, pid, user):
def _run_api_v2_projects_id_dataset_import_status(self, pid, user, rq_id):
with ForceLogin(user, self.client):
response = self.client.get("/api/projects/{}/dataset?action=import_status".format(pid), format="json")
response = self.client.get("/api/projects/{}/dataset?action=import_status&rq_id={}".format(pid, rq_id), format="json")
return response
def test_api_v2_projects_id_export_import(self):
......@@ -1867,7 +1867,8 @@ class ProjectImportExportAPITestCase(ApiTestBase):
response = self._run_api_v2_projects_id_dataset_import(pid_import, self.owner, import_data, "CVAT 1.1")
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner)
rq_id = response.data.get('rq_id')
response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner, rq_id)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def tearDown(self):
......
......@@ -9,17 +9,21 @@ import hashlib
import importlib
import sys
import traceback
from typing import Any, Dict, Optional
from contextlib import suppress
from typing import Any, Dict, Optional, Callable, Union
import subprocess
import os
import urllib.parse
import logging
import platform
from rq.job import Job
from django_rq.queues import DjangoRQ
from pathlib import Path
from django.http.request import HttpRequest
from django.utils import timezone
from django.utils.http import urlencode
from rest_framework.reverse import reverse as _reverse
from av import VideoFrame
......@@ -133,17 +137,29 @@ def parse_exception_message(msg):
pass
return parsed_msg
def process_failed_job(rq_job):
if os.path.exists(rq_job.meta['tmp_file']):
os.remove(rq_job.meta['tmp_file'])
exc_info = str(rq_job.exc_info or rq_job.dependency.exc_info)
def process_failed_job(rq_job: Job):
exc_info = str(rq_job.exc_info or getattr(rq_job.dependency, 'exc_info', None) or '')
if rq_job.dependency:
rq_job.dependency.delete()
rq_job.delete()
return parse_exception_message(exc_info)
def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, request):
msg = parse_exception_message(exc_info)
log = logging.getLogger('cvat.server.engine')
log.error(msg)
return msg
def configure_dependent_job(
queue: DjangoRQ,
rq_id: str,
rq_func: Callable[[Any, str, str], None],
db_storage: Any,
filename: str,
key: str,
request: HttpRequest,
result_ttl: float,
failure_ttl: float
) -> Job:
rq_job_id_download_file = rq_id + f'?action=download_{filename}'
rq_job_download_file = queue.fetch_job(rq_job_id_download_file)
if not rq_job_download_file:
......@@ -153,6 +169,8 @@ def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, re
args=(db_storage, filename, key),
job_id=rq_job_id_download_file,
meta=get_rq_job_meta(request=request, db_obj=db_storage),
result_ttl=result_ttl,
failure_ttl=failure_ttl
)
return rq_job_download_file
......@@ -218,6 +236,27 @@ def get_list_view_name(model):
'model_name': model._meta.object_name.lower()
}
def get_import_rq_id(
resource_type: str,
resource_id: int,
subresource_type: str,
user: str,
) -> str:
# import:<task|project|job>-<id|uuid>-<annotations|dataset|backup>-by-<user>
return f"import:{resource_type}-{resource_id}-{subresource_type}-by-{user}"
def import_resource_with_clean_up_after(
func: Union[Callable[[str, int, int], int], Callable[[str, int, str, bool], None]],
filename: str,
*args,
**kwargs,
) -> Any:
try:
result = func(filename, *args, **kwargs)
finally:
with suppress(FileNotFoundError):
os.remove(filename)
return result
def get_cpu_number() -> int:
cpu_number = None
......
此差异已折叠。
......@@ -148,7 +148,7 @@ def export(request, filter_query, queue_name):
if os.path.exists(file_path):
return Response(status=status.HTTP_201_CREATED)
elif rq_job.is_failed:
exc_info = str(rq_job.exc_info)
exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info))
rq_job.delete()
return Response(exc_info,
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
......
......@@ -50,7 +50,7 @@ diskcache==5.4.0
boto3==1.17.61
azure-storage-blob==12.13.0
google-cloud-storage==1.42.0
git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0
git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11
urllib3>=1.26.5 # not directly required, pinned by Snyk to avoid a vulnerability
natsort==8.0.0
mistune>=2.0.1 # not directly required, pinned by Snyk to avoid a vulnerability
......
# SHA1:53feeaa402abed516aad4a640244c5fd1bff765a
# SHA1:d1435558d66ec49d0c691492b2f3798960ca3bba
#
# This file is autogenerated by pip-compile-multi
# To update, run:
......@@ -66,7 +66,7 @@ cryptography==40.0.2
# pyjwt
cycler==0.11.0
# via matplotlib
datumaro @ git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0
datumaro @ git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11
# via -r cvat/requirements/base.in
defusedxml==0.7.1
# via
......
......@@ -1681,7 +1681,13 @@ paths:
description: Format is not available
post:
operationId: jobs_create_annotations
summary: Method allows to upload job annotations
description: |2
The request POST /api/jobs/id/annotations will initiate file upload and will create
the rq job on the server in which the process of annotations uploading from file
will be carried out. Please, use the PUT /api/jobs/id/annotations endpoint for checking status of the process.
summary: Method allows to initialize the process of the job annotation upload
from a local file or a cloud storage
parameters:
- in: query
name: cloud_storage_id
......@@ -1742,12 +1748,24 @@ paths:
'201':
description: Uploading has finished
'202':
content:
application/vnd.cvat+json:
schema:
$ref: '#/components/schemas/RqId'
description: Uploading has been started
'405':
description: Format is not available
put:
operationId: jobs_update_annotations
summary: Method performs an update of all annotations in a specific job
description: |2
To check the status of the process of uploading a job annotations from a file:
After initiating the annotations upload, you will receive an rq_id parameter.
Make sure to include this parameter as a query parameter in your subsequent
PUT /api/jobs/id/annotations requests to track the status of the annotations upload.
summary: Method performs an update of all annotations in a specific job or used
for uploading annotations from a file
parameters:
- in: query
name: format
......@@ -1763,6 +1781,11 @@ paths:
type: integer
description: A unique integer value identifying this job.
required: true
- in: query
name: rq_id
schema:
type: string
description: rq id
tags:
- jobs
requestBody:
......@@ -3059,6 +3082,14 @@ paths:
/api/projects/{id}/dataset/:
get:
operationId: projects_retrieve_dataset
description: |2
To check the status of the process of importing a project dataset from a file:
After initiating the dataset upload, you will receive an rq_id parameter.
Make sure to include this parameter as a query parameter in your subsequent
GET /api/projects/id/dataset requests to track the status of the dataset import.
Also you should specify action parameter: action=import_status.
summary: Export project as a dataset in a specific format
parameters:
- in: query
......@@ -3102,6 +3133,11 @@ paths:
- cloud_storage
- local
description: Where need to save downloaded dataset
- in: query
name: rq_id
schema:
type: string
description: rq id
- in: query
name: use_default_location
schema:
......@@ -3132,7 +3168,13 @@ paths:
description: Format is not available
post:
operationId: projects_create_dataset
summary: Import dataset in specific format as a project
description: |2
The request POST /api/projects/id/dataset will initiate file upload and will create
the rq job on the server in which the process of dataset import from a file
will be carried out. Please, use the GET /api/projects/id/dataset endpoint for checking status of the process.
summary: Import dataset in specific format as a project or check status of dataset
import process
parameters:
- in: query
name: cloud_storage_id
......@@ -3191,6 +3233,10 @@ paths:
- basicAuth: []
responses:
'202':
content:
application/vnd.cvat+json:
schema:
$ref: '#/components/schemas/RqId'
description: Importing has been started
'400':
description: Failed to import dataset
......@@ -3223,6 +3269,18 @@ paths:
/api/projects/backup/:
post:
operationId: projects_create_backup
description: |2
The backup import process is as follows:
The first request POST /api/projects/backup will initiate file upload and will create
the rq job on the server in which the process of a project creating from an uploaded backup
will be carried out.
After initiating the backup upload, you will receive an rq_id parameter.
Make sure to include this parameter as a query parameter in your subsequent requests
to track the status of the project creation.
Once the project has been successfully created, the server will return the id of the newly created project.
summary: Methods create a project from a backup
parameters:
- in: header
......@@ -3259,6 +3317,11 @@ paths:
schema:
type: integer
description: Organization identifier
- in: query
name: rq_id
schema:
type: string
description: rq id
tags:
- projects
requestBody:
......@@ -3279,6 +3342,10 @@ paths:
'201':
description: The project has been imported
'202':
content:
application/vnd.cvat+json:
schema:
$ref: '#/components/schemas/RqId'
description: Importing a backup file has been started
/api/schema/:
get:
......@@ -3835,8 +3902,13 @@ paths:
description: Format is not available
post:
operationId: tasks_create_annotations
summary: Method allows to upload task annotations from a local file or a cloud
storage
description: |2
The request POST /api/tasks/id/annotations will initiate file upload and will create
the rq job on the server in which the process of annotations uploading from file
will be carried out. Please, use the PUT /api/tasks/id/annotations endpoint for checking status of the process.
summary: Method allows to initialize the process of upload task annotations
from a local or a cloud storage file
parameters:
- in: query
name: cloud_storage_id
......@@ -3896,12 +3968,23 @@ paths:
'201':
description: Uploading has finished
'202':
content:
application/vnd.cvat+json:
schema:
$ref: '#/components/schemas/RqId'
description: Uploading has been started
'405':
description: Format is not available
put:
operationId: tasks_update_annotations
summary: Method allows to upload task annotations
description: |2
To check the status of the process of uploading a task annotations from a file:
After initiating the annotations upload, you will receive an rq_id parameter.
Make sure to include this parameter as a query parameter in your subsequent
PUT /api/tasks/id/annotations requests to track the status of the annotations upload.
summary: Method allows to upload task annotations or edit existing annotations
parameters:
- in: query
name: format
......@@ -3917,6 +4000,11 @@ paths:
type: integer
description: A unique integer value identifying this task.
required: true
- in: query
name: rq_id
schema:
type: string
description: rq id
tags:
- tasks
requestBody:
......@@ -4341,6 +4429,18 @@ paths:
/api/tasks/backup/:
post:
operationId: tasks_create_backup
description: |2
The backup import process is as follows:
The first request POST /api/tasks/backup will initiate file upload and will create
the rq job on the server in which the process of a task creating from an uploaded backup
will be carried out.
After initiating the backup upload, you will receive an rq_id parameter.
Make sure to include this parameter as a query parameter in your subsequent requests
to track the status of the task creation.
Once the task has been successfully created, the server will return the id of the newly created task.
summary: Method recreates a task from an attached task backup file
parameters:
- in: header
......@@ -4377,6 +4477,11 @@ paths:
schema:
type: integer
description: Organization identifier
- in: query
name: rq_id
schema:
type: string
description: rq id
tags:
- tasks
requestBody:
......@@ -4398,6 +4503,10 @@ paths:
'201':
description: The task has been imported
'202':
content:
application/vnd.cvat+json:
schema:
$ref: '#/components/schemas/RqId'
description: Importing a backup file has been started
/api/users:
get:
......@@ -7339,6 +7448,13 @@ components:
* `supervisor` - Supervisor
* `maintainer` - Maintainer
* `owner` - Owner
RqId:
type: object
properties:
rq_id:
type: string
required:
- rq_id
RqStatus:
type: object
properties:
......
......@@ -21,6 +21,7 @@ import os
import shutil
import subprocess
import sys
from datetime import timedelta
from distutils.util import strtobool
from enum import Enum
......@@ -294,6 +295,7 @@ class CVAT_QUEUES(Enum):
AUTO_ANNOTATION = 'annotation'
WEBHOOKS = 'webhooks'
NOTIFICATIONS = 'notifications'
CLEANING = 'cleaning'
RQ_QUEUES = {
CVAT_QUEUES.IMPORT_DATA.value: {
......@@ -326,6 +328,12 @@ RQ_QUEUES = {
'DB': 0,
'DEFAULT_TIMEOUT': '1h'
},
CVAT_QUEUES.CLEANING.value: {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': '1h'
},
}
NUCLIO = {
......@@ -346,7 +354,6 @@ RQ_EXCEPTION_HANDLERS = [
'cvat.apps.events.handlers.handle_rq_exception',
]
# JavaScript and CSS compression
# https://django-compressor.readthedocs.io
......@@ -667,3 +674,7 @@ DATABASES = {
}
BUCKET_CONTENT_MAX_PAGE_SIZE = 500
IMPORT_CACHE_FAILED_TTL = timedelta(days=90)
IMPORT_CACHE_SUCCESS_TTL = timedelta(hours=1)
IMPORT_CACHE_CLEAN_DELAY = timedelta(hours=2)
......@@ -10,3 +10,5 @@ from cvat.settings.production import *
PASSWORD_HASHERS = [
"django.contrib.auth.hashers.MD5PasswordHasher",
]
IMPORT_CACHE_CLEAN_DELAY = timedelta(seconds=30)
......@@ -77,6 +77,7 @@ services:
DJANGO_LOG_SERVER_HOST: vector
DJANGO_LOG_SERVER_PORT: 80
no_proxy: clickhouse,grafana,vector,nuclio,opa,${no_proxy:-}
NUMPROCS: 1
command: -c supervisord/utils.conf
volumes:
- cvat_data:/home/django/data
......
......@@ -41,3 +41,12 @@ command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -i
"
environment=VECTOR_EVENT_HANDLER="SynchronousLogstashHandler"
numprocs=1
[program:rqworker_cleaning]
command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \
exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 cleaning \
--worker-class cvat.rqworker.DefaultWorker \
"
environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock",VECTOR_EVENT_HANDLER="SynchronousLogstashHandler"
numprocs=%(ENV_NUMPROCS)s
process_name=rqworker_cleaning_%(process_num)s
\ No newline at end of file
......@@ -8,4 +8,3 @@ timeout = 15
markers =
with_external_services: The test requires services extrernal to the default CVAT deployment, e.g. a Git server etc.
......@@ -473,11 +473,13 @@ class TestImportExportDatasetProject:
_content_type="multipart/form-data",
)
assert response.status == HTTPStatus.ACCEPTED
rq_id = json.loads(response.data).get("rq_id")
assert rq_id, "The rq_id was not found in the response"
while True:
# TODO: It's better be refactored to a separate endpoint to get request status
(_, response) = api_client.projects_api.retrieve_dataset(
project_id, action="import_status"
project_id, action="import_status", rq_id=rq_id
)
if response.status == HTTPStatus.CREATED:
break
......
......@@ -11,8 +11,10 @@ from copy import deepcopy
from functools import partial
from http import HTTPStatus
from itertools import chain, product
from math import ceil
from pathlib import Path
from tempfile import TemporaryDirectory
from tempfile import NamedTemporaryFile, TemporaryDirectory
from time import sleep, time
from typing import List, Optional
import pytest
......@@ -21,10 +23,12 @@ from cvat_sdk.api_client import models
from cvat_sdk.api_client.api_client import ApiClient, Endpoint
from cvat_sdk.core.helpers import get_paginated_collection
from cvat_sdk.core.proxies.tasks import ResourceType, Task
from cvat_sdk.core.uploading import Uploader
from deepdiff import DeepDiff
from PIL import Image
import shared.utils.s3 as s3
from shared.fixtures.init import docker_exec_cvat, kube_exec_cvat
from shared.utils.config import (
BASE_URL,
USER_PASS,
......@@ -1726,3 +1730,110 @@ def test_can_report_correct_completed_jobs_count(tasks, jobs, admin_user):
task, _ = api_client.tasks_api.retrieve(task["id"])
assert task.jobs.completed == 1
class TestImportTaskAnnotations:
def _make_client(self) -> Client:
return Client(BASE_URL, config=Config(status_check_period=0.01))
@pytest.fixture(autouse=True)
def setup(self, restore_db_per_function, tmp_path: Path, admin_user: str):
self.tmp_dir = tmp_path
self.client = self._make_client()
self.user = admin_user
self.format = "COCO 1.0"
with self.client:
self.client.login((self.user, USER_PASS))
def _check_annotations(self, task_id):
with make_api_client(self.user) as api_client:
(_, response) = api_client.tasks_api.retrieve_annotations(id=task_id)
assert response.status == HTTPStatus.OK
annotations = json.loads(response.data)["shapes"]
assert len(annotations) > 0
def _delete_annotations(self, task_id):
with make_api_client(self.user) as api_client:
(_, response) = api_client.tasks_api.destroy_annotations(id=task_id)
assert response.status == HTTPStatus.NO_CONTENT
@pytest.mark.timeout(64)
@pytest.mark.parametrize("successful_upload", [True, False])
def test_can_import_annotations_after_previous_unclear_import(
self, successful_upload: bool, tasks_with_shapes
):
task_id = tasks_with_shapes[0]["id"]
self._check_annotations(task_id)
with NamedTemporaryFile() as f:
filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip"
task = self.client.tasks.retrieve(task_id)
task.export_dataset(self.format, filename, include_images=False)
self._delete_annotations(task_id)
params = {"format": self.format, "filename": filename.name}
url = self.client.api_map.make_endpoint_url(
self.client.api_client.tasks_api.create_annotations_endpoint.path
).format(id=task_id)
uploader = Uploader(self.client)
if successful_upload:
# define time required to upload file with annotations
start_time = time()
task.import_annotations(self.format, filename)
required_time = ceil(time() - start_time) * 2
self._delete_annotations(task_id)
response = uploader.upload_file(
url, filename, meta=params, query_params=params, logger=self.client.logger.debug
)
rq_id = json.loads(response.data)["rq_id"]
assert rq_id
else:
required_time = 54
uploader._tus_start_upload(url, query_params=params)
uploader._upload_file_data_with_tus(
url, filename, meta=params, logger=self.client.logger.debug
)
sleep(required_time)
if successful_upload:
self._check_annotations(task_id)
self._delete_annotations(task_id)
task.import_annotations(self.format, filename)
self._check_annotations(task_id)
@pytest.mark.timeout(64)
def test_check_import_cache_after_previous_interrupted_upload(self, tasks_with_shapes, request):
task_id = tasks_with_shapes[0]["id"]
with NamedTemporaryFile() as f:
filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip"
task = self.client.tasks.retrieve(task_id)
task.export_dataset(self.format, filename, include_images=False)
params = {"format": self.format, "filename": filename.name}
url = self.client.api_map.make_endpoint_url(
self.client.api_client.tasks_api.create_annotations_endpoint.path
).format(id=task_id)
uploader = Uploader(self.client)
uploader._tus_start_upload(url, query_params=params)
uploader._upload_file_data_with_tus(
url, filename, meta=params, logger=self.client.logger.debug
)
number_of_files = 1
sleep(30) # wait when the cleaning job from rq worker will be started
command = ["/bin/bash", "-c", f"ls data/tasks/{task_id}/tmp | wc -l"]
platform = request.config.getoption("--platform")
assert platform in ("kube", "local")
func = docker_exec_cvat if platform == "local" else kube_exec_cvat
for _ in range(12):
sleep(2)
result, _ = func(command)
number_of_files = int(result)
if not number_of_files:
break
assert not number_of_files
......@@ -367,7 +367,7 @@ class TestTaskUsecases:
assert task.id
assert task.id != fxt_new_task.id
assert task.size == fxt_new_task.size
assert "imported sucessfully" in self.logger_stream.getvalue()
assert "imported successfully" in self.logger_stream.getvalue()
assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
assert self.stdout.getvalue() == ""
......
......@@ -8,6 +8,7 @@ from http import HTTPStatus
from pathlib import Path
from subprocess import PIPE, CalledProcessError, run
from time import sleep
from typing import List, Union
import pytest
import requests
......@@ -23,7 +24,6 @@ PREFIX = "test"
CONTAINER_NAME_FILES = ["docker-compose.tests.yml"]
DC_FILES = [
"docker-compose.dev.yml",
"tests/docker-compose.file_share.yml",
......@@ -85,7 +85,7 @@ def _run(command, capture_output=True):
proc = run(_command, check=True) # nosec
return stdout, stderr
except CalledProcessError as exc:
stderr = exc.stderr.decode() if capture_output else "see above"
stderr = exc.stderr.decode() or exc.stdout.decode() if capture_output else "see above"
pytest.exit(
f"Command failed: {command}.\n"
f"Error message: {stderr}.\n"
......@@ -120,13 +120,17 @@ def kube_cp(source, target):
_run(f"kubectl cp {source} {target}")
def docker_exec_cvat(command):
_run(f"docker exec {PREFIX}_cvat_server_1 {command}")
def docker_exec_cvat(command: Union[List[str], str]):
base = f"docker exec {PREFIX}_cvat_server_1"
_command = f"{base} {command}" if isinstance(command, str) else base.split() + command
return _run(_command)
def kube_exec_cvat(command):
def kube_exec_cvat(command: Union[List[str], str]):
pod_name = _kube_get_server_pod_name()
_run(f"kubectl exec {pod_name} -- {command}")
base = f"kubectl exec {pod_name} --"
_command = f"{base} {command}" if isinstance(command, str) else base.split() + command
return _run(_command)
def docker_exec_cvat_db(command):
......@@ -211,7 +215,7 @@ def create_compose_files(container_name_files):
for service_name, service_config in dc_config["services"].items():
service_config.pop("container_name", None)
if service_name == "cvat_server":
if service_name in ("cvat_server", "cvat_utils"):
service_env = service_config["environment"]
service_env["DJANGO_SETTINGS_MODULE"] = "cvat.settings.testing_rest"
......
......@@ -58,6 +58,10 @@ def post_files_method(username, endpoint, data, files, **kwargs):
)
def put_method(username, endpoint, data, **kwargs):
return requests.put(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS))
def server_get(username, endpoint, **kwargs):
return requests.get(get_server_url(endpoint, **kwargs), auth=(username, USER_PASS))
......
......@@ -9,7 +9,7 @@ import pytest
T = TypeVar("T")
from shared.utils.config import get_method, post_method
from shared.utils.config import get_method, post_method, put_method
FILENAME_TEMPLATE = "cvat/{}/{}.zip"
EXPORT_FORMAT = "CVAT for images 1.1"
......@@ -117,9 +117,16 @@ class _CloudStorageResourceTest(ABC):
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
# Only the first POST request contains rq_id in response.
# Exclude cases with 403 expected status.
rq_id = None
if status == HTTPStatus.ACCEPTED:
rq_id = response.json().get("rq_id")
assert rq_id, "The rq_id was not found in the response"
while status != _expect_status:
assert status == HTTPStatus.ACCEPTED
response = post_method(user, url, data=None, **kwargs)
response = put_method(user, url, data=None, rq_id=rq_id, **kwargs)
status = response.status_code
if _check_uploaded:
......@@ -154,9 +161,16 @@ class _CloudStorageResourceTest(ABC):
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
# Only the first POST request contains rq_id in response.
# Exclude cases with 403 expected status.
rq_id = None
if status == HTTPStatus.ACCEPTED:
rq_id = response.json().get("rq_id")
assert rq_id, "The rq_id was not found in the response"
while status != _expect_status:
assert status == HTTPStatus.ACCEPTED
response = get_method(user, url, action="import_status")
response = get_method(user, url, action="import_status", rq_id=rq_id)
status = response.status_code
def _import_resource(self, cloud_storage: Dict[str, Any], resource_type: str, *args, **kwargs):
......
......@@ -14,6 +14,10 @@ cvat:
- mountPath: /home/django/share
name: cvat-backend-data
subPath: share
utils:
additionalEnv:
- name: DJANGO_SETTINGS_MODULE
value: cvat.settings.testing_rest
# Images are already present in the node
imagePullPolicy: Never
frontend:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册