未验证 提交 aadfd881 编写于 作者: R Roman Donchenko 提交者: GitHub

Fix TUS offset queries in production environments (#5204)

* Fix TUS offset queries in production environments

Previously, `mod_wsgi` would convert `HEAD` requests into `GET`, which
would be rejected, so clients were unable to resume an upload that failed
midway through.

To make use of this, update the SDK code to enable upload resumption.

* Add a test that forces a chunked TUS upload

* test_can_create_from_backup_in_chunks: make sure the upload is actually chunked
上级 46b88c11
......@@ -70,6 +70,8 @@ non-ascii paths while adding files from "Connected file share" (issue #4428)
- Create manifest with cvat/server docker container command (<https://github.com/opencv/cvat/pull/5172>)
- Cannot assign a resource to a user who has an organization (<https://github.com/opencv/cvat/pull/5218>)
- Oriented bounding boxes broken with COCO format ss(<https://github.com/opencv/cvat/pull/5219>)
- Fixed upload resumption in production environments
(<https://github.com/opencv/cvat/issues/4839>)
### Security
- TDB
......
......@@ -13,6 +13,7 @@ import requests
import urllib3
from cvat_sdk.api_client.api_client import ApiClient, Endpoint
from cvat_sdk.api_client.exceptions import ApiException
from cvat_sdk.api_client.rest import RESTClientObject
from cvat_sdk.core.helpers import StreamWithProgress, expect_status
from cvat_sdk.core.progress import NullProgressReporter, ProgressReporter
......@@ -20,14 +21,123 @@ from cvat_sdk.core.progress import NullProgressReporter, ProgressReporter
if TYPE_CHECKING:
from cvat_sdk.core.client import Client
import tusclient.uploader as tus_uploader
from tusclient.client import TusClient as _TusClient
from tusclient.client import Uploader as _TusUploader
from tusclient.request import TusRequest as _TusRequest
from tusclient.request import TusUploadFailed as _TusUploadFailed
MAX_REQUEST_SIZE = 100 * 2**20
class _RestClientAdapter:
# Provides requests.Session-like interface for REST client
# only patch is called in the tus client
def __init__(self, rest_client: RESTClientObject):
self.rest_client = rest_client
def _request(self, method, url, data=None, json=None, **kwargs):
raw = self.rest_client.request(
method=method,
url=url,
headers=kwargs.get("headers"),
query_params=kwargs.get("params"),
post_params=json,
body=data,
_parse_response=False,
_request_timeout=kwargs.get("timeout"),
_check_status=False,
)
result = requests.Response()
result._content = raw.data
result.raw = raw
result.headers.update(raw.headers)
result.status_code = raw.status
result.reason = raw.msg
return result
def patch(self, *args, **kwargs):
return self._request("PATCH", *args, **kwargs)
class _MyTusUploader(_TusUploader):
# Adjusts the library code for CVAT server
# Allows to reuse session
def __init__(self, *_args, api_client: ApiClient, **_kwargs):
self._api_client = api_client
super().__init__(*_args, **_kwargs)
def _do_request(self):
self.request = _TusRequest(self)
self.request.handle = _RestClientAdapter(self._api_client.rest_client)
try:
self.request.perform()
self.verify_upload()
except _TusUploadFailed as error:
self._retry_or_cry(error)
@tus_uploader._catch_requests_error
def create_url(self):
"""
Return upload url.
Makes request to tus server to create a new upload url for the required file upload.
"""
headers = self.headers
headers["upload-length"] = str(self.file_size)
headers["upload-metadata"] = ",".join(self.encode_metadata())
resp = self._api_client.rest_client.POST(self.client.url, headers=headers)
url = resp.headers.get("location")
if url is None:
msg = "Attempt to retrieve create file url with status {}".format(resp.status_code)
raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content)
return tus_uploader.urljoin(self.client.url, url)
@tus_uploader._catch_requests_error
def get_offset(self):
"""
Return offset from tus server.
This is different from the instance attribute 'offset' because this makes an
http request to the tus server to retrieve the offset.
"""
try:
resp = self._api_client.rest_client.HEAD(self.url, headers=self.headers)
except ApiException as ex:
if ex.status == 405: # Method Not Allowed
# In CVAT up to version 2.2.0, HEAD requests were internally
# converted to GET by mod_wsgi, and subsequently rejected by the server.
# For compatibility with old servers, we'll handle such rejections by
# restarting the upload from the beginning.
return 0
raise tus_uploader.TusCommunicationError(
f"Attempt to retrieve offset failed with status {ex.status}",
ex.status,
ex.body,
) from ex
offset = resp.headers.get("upload-offset")
if offset is None:
raise tus_uploader.TusCommunicationError(
f"Attempt to retrieve offset failed with status {resp.status}",
resp.status,
resp.data,
)
return int(offset)
class Uploader:
"""
Implements common uploading protocols
"""
_CHUNK_SIZE = 10 * 2**20
def __init__(self, client: Client):
self._client = client
......@@ -132,110 +242,16 @@ class Uploader:
@staticmethod
def _make_tus_uploader(api_client: ApiClient, url: str, **kwargs):
import tusclient.uploader as tus_uploader
from tusclient.client import TusClient, Uploader
from tusclient.request import TusRequest, TusUploadFailed
class RestClientAdapter:
# Provides requests.Session-like interface for REST client
# only patch is called in the tus client
def __init__(self, rest_client: RESTClientObject):
self.rest_client = rest_client
def _request(self, method, url, data=None, json=None, **kwargs):
raw = self.rest_client.request(
method=method,
url=url,
headers=kwargs.get("headers"),
query_params=kwargs.get("params"),
post_params=json,
body=data,
_parse_response=False,
_request_timeout=kwargs.get("timeout"),
_check_status=False,
)
result = requests.Response()
result._content = raw.data
result.raw = raw
result.headers.update(raw.headers)
result.status_code = raw.status
result.reason = raw.msg
return result
def patch(self, *args, **kwargs):
return self._request("PATCH", *args, **kwargs)
class MyTusUploader(Uploader):
# Adjusts the library code for CVAT server
# Allows to reuse session
def __init__(self, *_args, api_client: ApiClient, **_kwargs):
self._api_client = api_client
super().__init__(*_args, **_kwargs)
def _do_request(self):
self.request = TusRequest(self)
self.request.handle = RestClientAdapter(self._api_client.rest_client)
try:
self.request.perform()
self.verify_upload()
except TusUploadFailed as error:
self._retry_or_cry(error)
@tus_uploader._catch_requests_error
def create_url(self):
"""
Return upload url.
Makes request to tus server to create a new upload url for the required file upload.
"""
headers = self.headers
headers["upload-length"] = str(self.file_size)
headers["upload-metadata"] = ",".join(self.encode_metadata())
resp = self._api_client.rest_client.POST(self.client.url, headers=headers)
url = resp.headers.get("location")
if url is None:
msg = "Attempt to retrieve create file url with status {}".format(
resp.status_code
)
raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content)
return tus_uploader.urljoin(self.client.url, url)
@tus_uploader._catch_requests_error
def get_offset(self):
"""
Return offset from tus server.
This is different from the instance attribute 'offset' because this makes an
http request to the tus server to retrieve the offset.
"""
# FIXME: traefik changes HEAD to GET for some reason, and it breaks the protocol
# Assume we are starting from scratch. This effectively disallows us to resume
# old file uploading
return 0
# resp = self._api_client.rest_client.HEAD(self.url, headers=self.headers)
# offset = resp.headers.get("upload-offset")
# if offset is None:
# msg = "Attempt to retrieve offset fails with status {}".format(resp.status_code)
# raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content)
# return int(offset)
# Add headers required by CVAT server
headers = {}
headers["Origin"] = api_client.configuration.host
headers.update(api_client.get_common_headers())
client = TusClient(url, headers=headers)
client = _TusClient(url, headers=headers)
return MyTusUploader(client=client, api_client=api_client, **kwargs)
return _MyTusUploader(client=client, api_client=api_client, **kwargs)
def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None):
CHUNK_SIZE = 10 * 2**20
file_size = os.stat(filename).st_size
if pbar is None:
pbar = NullProgressReporter()
......@@ -248,7 +264,7 @@ class Uploader:
url=url.rstrip("/") + "/",
metadata=meta,
file_stream=input_file_with_progress,
chunk_size=CHUNK_SIZE,
chunk_size=Uploader._CHUNK_SIZE,
log_func=logger,
)
tus_uploader.upload()
......
......@@ -2,3 +2,15 @@ LoadModule xsendfile_module /usr/lib/apache2/modules/mod_xsendfile.so
XSendFile On
XSendFilePath ${HOME}/data/
XSendFilePath ${HOME}/static/
# The presence of an Apache output filter (mod_xsendfile) causes mod_wsgi
# to internally convert HEAD requests to GET before passing them to the
# application, for reasons explained here:
# <http://blog.dscpl.com.au/2009/10/wsgi-issues-with-http-head-requests.html>.
# However, we need HEAD requests passed through as-is, because the TUS
# protocol requires them. It should be safe to disable this functionality in
# our case, because mod_xsendfile does not examine the response body (it
# either passes it through or discards it entirely based on the headers),
# so it shouldn't matter whether the application omits the body in response
# to a HEAD request.
WSGIMapHEADToGET Off
......@@ -12,6 +12,7 @@ import pytest
from cvat_sdk import Client, models
from cvat_sdk.api_client import exceptions
from cvat_sdk.core.proxies.tasks import ResourceType, Task
from cvat_sdk.core.uploading import Uploader, _MyTusUploader
from PIL import Image
from shared.utils.helpers import generate_image_files
......@@ -279,7 +280,7 @@ class TestTaskUsecases:
assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
assert self.stdout.getvalue() == ""
def test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path):
def _test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path):
pbar_out = io.StringIO()
pbar = make_pbar(file=pbar_out)
......@@ -292,6 +293,29 @@ class TestTaskUsecases:
assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
assert self.stdout.getvalue() == ""
def test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path):
self._test_can_create_from_backup(fxt_new_task, fxt_backup_file)
def test_can_create_from_backup_in_chunks(
self, monkeypatch: pytest.MonkeyPatch, fxt_new_task: Task, fxt_backup_file: Path
):
monkeypatch.setattr(Uploader, "_CHUNK_SIZE", 100)
num_requests = 0
original_do_request = _MyTusUploader._do_request
def counting_do_request(uploader):
nonlocal num_requests
num_requests += 1
original_do_request(uploader)
monkeypatch.setattr(_MyTusUploader, "_do_request", counting_do_request)
self._test_can_create_from_backup(fxt_new_task, fxt_backup_file)
# make sure the upload was actually chunked
assert num_requests > 1
def test_can_get_jobs(self, fxt_new_task: Task):
jobs = fxt_new_task.get_jobs()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册