From 69f6963d45517ff33ed9da13164b93ca9b0298c4 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Mon, 25 Jan 2021 11:20:25 +0800 Subject: [PATCH] Enable lock and fix row count Signed-off-by: FluorineDog --- internal/core/src/query/PlanImpl.h | 3 +- .../core/src/segcore/SegmentInterface.cpp | 12 ++++--- internal/core/src/segcore/SegmentInterface.h | 35 +++++++++++++------ internal/core/src/segcore/SegmentSealed.h | 4 +++ .../core/src/segcore/SegmentSealedImpl.cpp | 21 ++++++++--- internal/core/src/segcore/SegmentSealedImpl.h | 23 +++++++++--- 6 files changed, 71 insertions(+), 27 deletions(-) diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 25d519e46..8fe71c8cc 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -34,7 +34,8 @@ struct Plan { const Schema& schema_; std::unique_ptr plan_node_; std::map tag2field_; // PlaceholderName -> FieldOffset - std::vector target_entries_; + std::vector target_entries_; + std::vector referred_fields_; // TODO: add move extra info }; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 0066ee360..0fa5fcbb6 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -15,7 +15,8 @@ namespace milvus::segcore { class Naive; void -SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { +SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { + std::shared_lock lck(mutex_); AssertInfo(plan, "empty plan"); auto size = results.result_distances_.size(); Assert(results.internal_seg_offsets_.size() == size); @@ -42,10 +43,11 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) } QueryResult -SegmentInterface::Search(const query::Plan* plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { +SegmentInternalInterface::Search(const query::Plan* plan, + const query::PlaceholderGroup** placeholder_groups, + const Timestamp* timestamps, + int64_t num_groups) const { + std::shared_lock lck(mutex_); Assert(num_groups == 1); query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); auto results = visitor.get_moved_result(*plan->plan_node_); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index b68a9c993..ad31bfd70 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -26,14 +26,14 @@ namespace milvus::segcore { class SegmentInterface { public: // fill results according to target_entries in plan - void - FillTargetEntry(const query::Plan* plan, QueryResult& results) const; + virtual void + FillTargetEntry(const query::Plan* plan, QueryResult& results) const = 0; - QueryResult + virtual QueryResult Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], - int64_t num_groups) const; + int64_t num_groups) const = 0; virtual int64_t GetMemoryUsageInBytes() const = 0; @@ -47,13 +47,6 @@ class SegmentInterface { virtual ~SegmentInterface() = default; protected: - // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type - virtual void - bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; - - // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset - virtual void - bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; }; // internal API for DSL calculation @@ -77,6 +70,15 @@ class SegmentInternalInterface : public SegmentInterface { return *ptr; } + QueryResult + Search(const query::Plan* Plan, + const query::PlaceholderGroup* placeholder_groups[], + const Timestamp timestamps[], + int64_t num_groups) const override; + + void + FillTargetEntry(const query::Plan* plan, QueryResult& results) const override; + public: virtual void vector_search(int64_t vec_count, @@ -106,6 +108,17 @@ class SegmentInternalInterface : public SegmentInterface { // internal API: return chunk_index in span, support scalar index only virtual const knowhere::Index* chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; + + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type + virtual void + bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset + virtual void + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + + protected: + mutable std::shared_mutex mutex_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 97104bdb6..6d1e5d7ce 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -22,6 +22,10 @@ class SegmentSealed : public SegmentInternalInterface { LoadIndex(const LoadIndexInfo& info) = 0; virtual void LoadFieldData(const LoadFieldDataInfo& info) = 0; + virtual void + DropIndex(const FieldId field_id) = 0; + virtual void + DropFieldData(const FieldId field_id) = 0; }; using SegmentSealedPtr = std::unique_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 49935905d..7558259e6 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -14,6 +14,7 @@ namespace milvus::segcore { void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { + // NOTE: lock only when data is ready to avoid starvation auto field_id = FieldId(info.field_id); auto field_offset = schema_->get_offset(field_id); @@ -35,7 +36,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { void SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { - // TODO + // NOTE: lock only when data is ready to avoid starvation Assert(info.row_count > 0); auto field_id = FieldId(info.field_id); Assert(info.blob); @@ -54,8 +55,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { update_row_count(info.row_count); AssertInfo(row_ids_.empty(), "already exists"); row_ids_ = std::move(vec_data); + ++system_ready_count_; - ++ready_count_; } else { // prepare data auto field_offset = schema_->get_offset(field_id); @@ -105,12 +106,13 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c const knowhere::Index* SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { // TODO: support scalar index - return nullptr; + PanicInfo("unimplemented"); } int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { // TODO: add estimate for index + std::shared_lock lck(mutex_); auto row_count = row_count_opt_.value_or(0); return schema_->get_total_sizeof() * row_count; } @@ -118,8 +120,7 @@ SegmentSealedImpl::GetMemoryUsageInBytes() const { int64_t SegmentSealedImpl::get_row_count() const { std::shared_lock lck(mutex_); - AssertInfo(row_count_opt_.has_value(), "Data not loaded"); - return row_count_opt_.value(); + return row_count_opt_.value_or(0); } const Schema& @@ -140,6 +141,16 @@ SegmentSealedImpl::vector_search(int64_t vec_count, Assert(vec_indexings_.is_ready(field_offset)); query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); } +void +SegmentSealedImpl::DropFieldData(const FieldId field_id) { + std::unique_lock lck(mutex_); + PanicInfo("unimplemented"); +} +void +SegmentSealedImpl::DropIndex(const FieldId field_id) { + std::unique_lock lck(mutex_); + PanicInfo("unimplemented"); +} SegmentSealedPtr CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk) { diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 5871fe67b..faf1a9f49 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -24,6 +24,10 @@ class SegmentSealedImpl : public SegmentSealed { LoadIndex(const LoadIndexInfo& info) override; void LoadFieldData(const LoadFieldDataInfo& info) override; + void + DropIndex(const FieldId field_id) override; + void + DropFieldData(const FieldId field_id) override; public: int64_t @@ -106,18 +110,27 @@ class SegmentSealedImpl : public SegmentSealed { const BitsetView& bitset, QueryResult& output) const override; + bool + is_system_field_ready() const { + return system_ready_count_ == 1; + } + bool is_all_ready() const { // TODO: optimize here // NOTE: including row_ids - return ready_count_ == schema_->size() + 1; + if (!is_system_field_ready()) { + return false; + } + return ready_count_ == schema_->size(); } - mutable std::shared_mutex mutex_; - std::atomic_int ready_count_ = 0; - private: - // TOOD: generate index for scalar + // segment loading state + std::atomic ready_count_ = 0; + std::atomic system_ready_count_ = 0; + // segment datas + // TODO: generate index for scalar std::optional row_count_opt_; std::map scalar_indexings_; SealedIndexingRecord vec_indexings_; -- GitLab