diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index e5e85143c265de32a8656f27918e493be3040828..c9405daed8100f9707b18c4dfce76507070f0b48 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -22,7 +22,7 @@ from cvat.apps.profiler import silk_profile from cvat.apps.dataset_manager.annotation import AnnotationIR, AnnotationManager from cvat.apps.dataset_manager.bindings import TaskData, JobData, CvatImportError from cvat.apps.dataset_manager.formats.registry import make_exporter, make_importer -from cvat.apps.dataset_manager.util import add_prefetch_fields, bulk_create +from cvat.apps.dataset_manager.util import add_prefetch_fields, bulk_create, get_cached class dotdict(OrderedDict): @@ -105,17 +105,14 @@ class JobAnnotation: def __init__(self, pk, *, is_prefetched=False, queryset=None): if queryset is None: - queryset = self.add_prefetch_info(models.Job.objects).all() + queryset = self.add_prefetch_info(models.Job.objects) if is_prefetched: self.db_job: models.Job = queryset.select_related( 'segment__task' ).select_for_update().get(id=pk) else: - try: - self.db_job: models.Job = next(job for job in queryset if job.pk == int(pk)) - except StopIteration as ex: - raise models.Job.DoesNotExist from ex + self.db_job: models.Job = get_cached(queryset, pk=int(pk)) db_segment = self.db_job.segment self.start_frame = db_segment.start_frame diff --git a/cvat/apps/dataset_manager/util.py b/cvat/apps/dataset_manager/util.py index 7359defb92dd6268a99d7ffb378f9e3a5ae7357c..387b74d21777d3d485942c9821b317ae2fd14f00 100644 --- a/cvat/apps/dataset_manager/util.py +++ b/cvat/apps/dataset_manager/util.py @@ -10,7 +10,7 @@ import os, os.path as osp import zipfile from django.conf import settings -from django.db.models import QuerySet +from django.db import models def current_function_name(depth=1): @@ -40,16 +40,35 @@ def bulk_create(db_model, objects, flt_param): return [] -def is_prefetched(queryset: QuerySet, field: str) -> bool: +def is_prefetched(queryset: models.QuerySet, field: str) -> bool: return field in queryset._prefetch_related_lookups -def add_prefetch_fields(queryset: QuerySet, fields: Sequence[str]) -> QuerySet: +def add_prefetch_fields(queryset: models.QuerySet, fields: Sequence[str]) -> models.QuerySet: for field in fields: if not is_prefetched(queryset, field): queryset = queryset.prefetch_related(field) return queryset +def get_cached(queryset: models.QuerySet, pk: int) -> models.Model: + """ + Like regular queryset.get(), but checks for the cached values first + instead of just making a request. + """ + + # Read more about caching insights: + # https://www.mattduck.com/2021-01-django-orm-result-cache.html + # The field is initialized on accessing the query results, eg. on iteration + if getattr(queryset, '_result_cache'): + result = next((obj for obj in queryset if obj.pk == pk), None) + else: + result = None + + if result is None: + result = queryset.get(id=pk) + + return result + def deepcopy_simple(v): # Default deepcopy is very slow diff --git a/cvat/apps/engine/migrations/0070_add_job_type_created_date.py b/cvat/apps/engine/migrations/0070_add_job_type_created_date.py index cd8230e04a518ba077e2c1a9140f1dcb5c88611e..034a6b275ae9e3f9f51ea1f11f97449382694bf0 100644 --- a/cvat/apps/engine/migrations/0070_add_job_type_created_date.py +++ b/cvat/apps/engine/migrations/0070_add_job_type_created_date.py @@ -11,7 +11,7 @@ def add_created_date_to_existing_jobs(apps, schema_editor): task = job.segment.task job.created_date = task.created_date - Job.objects.bulk_update(jobs, fields=['created_date']) + Job.objects.bulk_update(jobs, fields=['created_date'], batch_size=500) class Migration(migrations.Migration): diff --git a/cvat/apps/quality_control/quality_reports.py b/cvat/apps/quality_control/quality_reports.py index 78a6b09dec162c62d7697b716726d60b5b4cd9d3..29e1e2e872642b97c3b8223f6f9ab53715890cdc 100644 --- a/cvat/apps/quality_control/quality_reports.py +++ b/cvat/apps/quality_control/quality_reports.py @@ -2241,10 +2241,9 @@ class QualityReportUpdateManager: except Task.DoesNotExist: return - # Try to use shared queryset to minimize DB requests - job_queryset = Job.objects.prefetch_related("segment") - job_queryset = JobDataProvider.add_prefetch_info(job_queryset) - job_queryset = job_queryset.filter(segment__task_id=task_id).all() + # Try to use a shared queryset to minimize DB requests + job_queryset = Job.objects.select_related("segment") + job_queryset = job_queryset.filter(segment__task_id=task_id) # The GT job could have been removed during scheduling, so we need to check it exists gt_job: Job = next( @@ -2258,6 +2257,14 @@ class QualityReportUpdateManager: # - job updated -> job report is computed # old reports can be reused in this case (need to add M-1 relationship in reports) + # Add prefetch data to the shared queryset + # All the jobs / segments share the same task, so we can load it just once. + # We reuse the same object for better memory use (OOM is possible otherwise). + # Perform manual "join", since django can't do this. + gt_job = JobDataProvider.add_prefetch_info(job_queryset).get(id=gt_job.id) + for job in job_queryset: + job.segment.task = gt_job.segment.task + # Preload all the data for the computations # It must be done in a single transaction and before all the remaining computations # because the task and jobs can be changed after the beginning,