diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 25d519e466b4c2d523503878ff9bbab923f3c4e1..8fe71c8cc56c3549dc5376cf8572968be2ec6b4d 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -34,7 +34,8 @@ struct Plan { const Schema& schema_; std::unique_ptr plan_node_; std::map tag2field_; // PlaceholderName -> FieldOffset - std::vector target_entries_; + std::vector target_entries_; + std::vector referred_fields_; // TODO: add move extra info }; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 0066ee360f4d25ad1ddf7ca8a1e71674fef4877d..0fa5fcbb606f1c8d35cd0c6191bf2e13a20082b3 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -15,7 +15,8 @@ namespace milvus::segcore { class Naive; void -SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { +SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { + std::shared_lock lck(mutex_); AssertInfo(plan, "empty plan"); auto size = results.result_distances_.size(); Assert(results.internal_seg_offsets_.size() == size); @@ -42,10 +43,11 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) } QueryResult -SegmentInterface::Search(const query::Plan* plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { +SegmentInternalInterface::Search(const query::Plan* plan, + const query::PlaceholderGroup** placeholder_groups, + const Timestamp* timestamps, + int64_t num_groups) const { + std::shared_lock lck(mutex_); Assert(num_groups == 1); query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); auto results = visitor.get_moved_result(*plan->plan_node_); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index b68a9c99352875d01cce717fefabe2029841baa0..ad31bfd70dd936d397ccce1ceb004594b12fc5a6 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -26,14 +26,14 @@ namespace milvus::segcore { class SegmentInterface { public: // fill results according to target_entries in plan - void - FillTargetEntry(const query::Plan* plan, QueryResult& results) const; + virtual void + FillTargetEntry(const query::Plan* plan, QueryResult& results) const = 0; - QueryResult + virtual QueryResult Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], - int64_t num_groups) const; + int64_t num_groups) const = 0; virtual int64_t GetMemoryUsageInBytes() const = 0; @@ -47,13 +47,6 @@ class SegmentInterface { virtual ~SegmentInterface() = default; protected: - // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type - virtual void - bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; - - // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset - virtual void - bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; }; // internal API for DSL calculation @@ -77,6 +70,15 @@ class SegmentInternalInterface : public SegmentInterface { return *ptr; } + QueryResult + Search(const query::Plan* Plan, + const query::PlaceholderGroup* placeholder_groups[], + const Timestamp timestamps[], + int64_t num_groups) const override; + + void + FillTargetEntry(const query::Plan* plan, QueryResult& results) const override; + public: virtual void vector_search(int64_t vec_count, @@ -106,6 +108,17 @@ class SegmentInternalInterface : public SegmentInterface { // internal API: return chunk_index in span, support scalar index only virtual const knowhere::Index* chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; + + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type + virtual void + bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset + virtual void + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + + protected: + mutable std::shared_mutex mutex_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 97104bdb670240ba9e7c8c8e89eb56c4c823fe06..6d1e5d7ce9bf0a4374cb59c51c7e62ed3690667a 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -22,6 +22,10 @@ class SegmentSealed : public SegmentInternalInterface { LoadIndex(const LoadIndexInfo& info) = 0; virtual void LoadFieldData(const LoadFieldDataInfo& info) = 0; + virtual void + DropIndex(const FieldId field_id) = 0; + virtual void + DropFieldData(const FieldId field_id) = 0; }; using SegmentSealedPtr = std::unique_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 49935905d5dd2527d61326f8509c26dd931798b1..7558259e6b436e88428d65263ad8e4743f409fda 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -14,6 +14,7 @@ namespace milvus::segcore { void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { + // NOTE: lock only when data is ready to avoid starvation auto field_id = FieldId(info.field_id); auto field_offset = schema_->get_offset(field_id); @@ -35,7 +36,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { void SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { - // TODO + // NOTE: lock only when data is ready to avoid starvation Assert(info.row_count > 0); auto field_id = FieldId(info.field_id); Assert(info.blob); @@ -54,8 +55,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { update_row_count(info.row_count); AssertInfo(row_ids_.empty(), "already exists"); row_ids_ = std::move(vec_data); + ++system_ready_count_; - ++ready_count_; } else { // prepare data auto field_offset = schema_->get_offset(field_id); @@ -105,12 +106,13 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c const knowhere::Index* SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { // TODO: support scalar index - return nullptr; + PanicInfo("unimplemented"); } int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { // TODO: add estimate for index + std::shared_lock lck(mutex_); auto row_count = row_count_opt_.value_or(0); return schema_->get_total_sizeof() * row_count; } @@ -118,8 +120,7 @@ SegmentSealedImpl::GetMemoryUsageInBytes() const { int64_t SegmentSealedImpl::get_row_count() const { std::shared_lock lck(mutex_); - AssertInfo(row_count_opt_.has_value(), "Data not loaded"); - return row_count_opt_.value(); + return row_count_opt_.value_or(0); } const Schema& @@ -140,6 +141,16 @@ SegmentSealedImpl::vector_search(int64_t vec_count, Assert(vec_indexings_.is_ready(field_offset)); query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); } +void +SegmentSealedImpl::DropFieldData(const FieldId field_id) { + std::unique_lock lck(mutex_); + PanicInfo("unimplemented"); +} +void +SegmentSealedImpl::DropIndex(const FieldId field_id) { + std::unique_lock lck(mutex_); + PanicInfo("unimplemented"); +} SegmentSealedPtr CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk) { diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 5871fe67bce7c4b64936e1f3ec1c6fe1c6e4296d..faf1a9f498664fd3749c133b88e61755e19acd69 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -24,6 +24,10 @@ class SegmentSealedImpl : public SegmentSealed { LoadIndex(const LoadIndexInfo& info) override; void LoadFieldData(const LoadFieldDataInfo& info) override; + void + DropIndex(const FieldId field_id) override; + void + DropFieldData(const FieldId field_id) override; public: int64_t @@ -106,18 +110,27 @@ class SegmentSealedImpl : public SegmentSealed { const BitsetView& bitset, QueryResult& output) const override; + bool + is_system_field_ready() const { + return system_ready_count_ == 1; + } + bool is_all_ready() const { // TODO: optimize here // NOTE: including row_ids - return ready_count_ == schema_->size() + 1; + if (!is_system_field_ready()) { + return false; + } + return ready_count_ == schema_->size(); } - mutable std::shared_mutex mutex_; - std::atomic_int ready_count_ = 0; - private: - // TOOD: generate index for scalar + // segment loading state + std::atomic ready_count_ = 0; + std::atomic system_ready_count_ = 0; + // segment datas + // TODO: generate index for scalar std::optional row_count_opt_; std::map scalar_indexings_; SealedIndexingRecord vec_indexings_;