提交 69f6963d 编写于 作者: F FluorineDog 提交者: yefu.chen

Enable lock and fix row count

Signed-off-by: NFluorineDog <guilin.gou@zilliz.com>
上级 73aeedc2
...@@ -34,7 +34,8 @@ struct Plan { ...@@ -34,7 +34,8 @@ struct Plan {
const Schema& schema_; const Schema& schema_;
std::unique_ptr<VectorPlanNode> plan_node_; std::unique_ptr<VectorPlanNode> plan_node_;
std::map<std::string, FieldOffset> tag2field_; // PlaceholderName -> FieldOffset std::map<std::string, FieldOffset> tag2field_; // PlaceholderName -> FieldOffset
std::vector<std::string> target_entries_; std::vector<FieldOffset> target_entries_;
std::vector<FieldOffset> referred_fields_;
// TODO: add move extra info // TODO: add move extra info
}; };
......
...@@ -15,7 +15,8 @@ namespace milvus::segcore { ...@@ -15,7 +15,8 @@ namespace milvus::segcore {
class Naive; class Naive;
void void
SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const {
std::shared_lock lck(mutex_);
AssertInfo(plan, "empty plan"); AssertInfo(plan, "empty plan");
auto size = results.result_distances_.size(); auto size = results.result_distances_.size();
Assert(results.internal_seg_offsets_.size() == size); Assert(results.internal_seg_offsets_.size() == size);
...@@ -42,10 +43,11 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) ...@@ -42,10 +43,11 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results)
} }
QueryResult QueryResult
SegmentInterface::Search(const query::Plan* plan, SegmentInternalInterface::Search(const query::Plan* plan,
const query::PlaceholderGroup** placeholder_groups, const query::PlaceholderGroup** placeholder_groups,
const Timestamp* timestamps, const Timestamp* timestamps,
int64_t num_groups) const { int64_t num_groups) const {
std::shared_lock lck(mutex_);
Assert(num_groups == 1); Assert(num_groups == 1);
query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]);
auto results = visitor.get_moved_result(*plan->plan_node_); auto results = visitor.get_moved_result(*plan->plan_node_);
......
...@@ -26,14 +26,14 @@ namespace milvus::segcore { ...@@ -26,14 +26,14 @@ namespace milvus::segcore {
class SegmentInterface { class SegmentInterface {
public: public:
// fill results according to target_entries in plan // fill results according to target_entries in plan
void virtual void
FillTargetEntry(const query::Plan* plan, QueryResult& results) const; FillTargetEntry(const query::Plan* plan, QueryResult& results) const = 0;
QueryResult virtual QueryResult
Search(const query::Plan* Plan, Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[], const query::PlaceholderGroup* placeholder_groups[],
const Timestamp timestamps[], const Timestamp timestamps[],
int64_t num_groups) const; int64_t num_groups) const = 0;
virtual int64_t virtual int64_t
GetMemoryUsageInBytes() const = 0; GetMemoryUsageInBytes() const = 0;
...@@ -47,13 +47,6 @@ class SegmentInterface { ...@@ -47,13 +47,6 @@ class SegmentInterface {
virtual ~SegmentInterface() = default; virtual ~SegmentInterface() = default;
protected: protected:
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type
virtual void
bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset
virtual void
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
}; };
// internal API for DSL calculation // internal API for DSL calculation
...@@ -77,6 +70,15 @@ class SegmentInternalInterface : public SegmentInterface { ...@@ -77,6 +70,15 @@ class SegmentInternalInterface : public SegmentInterface {
return *ptr; return *ptr;
} }
QueryResult
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
const Timestamp timestamps[],
int64_t num_groups) const override;
void
FillTargetEntry(const query::Plan* plan, QueryResult& results) const override;
public: public:
virtual void virtual void
vector_search(int64_t vec_count, vector_search(int64_t vec_count,
...@@ -106,6 +108,17 @@ class SegmentInternalInterface : public SegmentInterface { ...@@ -106,6 +108,17 @@ class SegmentInternalInterface : public SegmentInterface {
// internal API: return chunk_index in span, support scalar index only // internal API: return chunk_index in span, support scalar index only
virtual const knowhere::Index* virtual const knowhere::Index*
chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0;
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type
virtual void
bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset
virtual void
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0;
protected:
mutable std::shared_mutex mutex_;
}; };
} // namespace milvus::segcore } // namespace milvus::segcore
...@@ -22,6 +22,10 @@ class SegmentSealed : public SegmentInternalInterface { ...@@ -22,6 +22,10 @@ class SegmentSealed : public SegmentInternalInterface {
LoadIndex(const LoadIndexInfo& info) = 0; LoadIndex(const LoadIndexInfo& info) = 0;
virtual void virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0; LoadFieldData(const LoadFieldDataInfo& info) = 0;
virtual void
DropIndex(const FieldId field_id) = 0;
virtual void
DropFieldData(const FieldId field_id) = 0;
}; };
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>; using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
namespace milvus::segcore { namespace milvus::segcore {
void void
SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
// NOTE: lock only when data is ready to avoid starvation
auto field_id = FieldId(info.field_id); auto field_id = FieldId(info.field_id);
auto field_offset = schema_->get_offset(field_id); auto field_offset = schema_->get_offset(field_id);
...@@ -35,7 +36,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { ...@@ -35,7 +36,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
void void
SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
// TODO // NOTE: lock only when data is ready to avoid starvation
Assert(info.row_count > 0); Assert(info.row_count > 0);
auto field_id = FieldId(info.field_id); auto field_id = FieldId(info.field_id);
Assert(info.blob); Assert(info.blob);
...@@ -54,8 +55,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { ...@@ -54,8 +55,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
update_row_count(info.row_count); update_row_count(info.row_count);
AssertInfo(row_ids_.empty(), "already exists"); AssertInfo(row_ids_.empty(), "already exists");
row_ids_ = std::move(vec_data); row_ids_ = std::move(vec_data);
++system_ready_count_;
++ready_count_;
} else { } else {
// prepare data // prepare data
auto field_offset = schema_->get_offset(field_id); auto field_offset = schema_->get_offset(field_id);
...@@ -105,12 +106,13 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c ...@@ -105,12 +106,13 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c
const knowhere::Index* const knowhere::Index*
SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const {
// TODO: support scalar index // TODO: support scalar index
return nullptr; PanicInfo("unimplemented");
} }
int64_t int64_t
SegmentSealedImpl::GetMemoryUsageInBytes() const { SegmentSealedImpl::GetMemoryUsageInBytes() const {
// TODO: add estimate for index // TODO: add estimate for index
std::shared_lock lck(mutex_);
auto row_count = row_count_opt_.value_or(0); auto row_count = row_count_opt_.value_or(0);
return schema_->get_total_sizeof() * row_count; return schema_->get_total_sizeof() * row_count;
} }
...@@ -118,8 +120,7 @@ SegmentSealedImpl::GetMemoryUsageInBytes() const { ...@@ -118,8 +120,7 @@ SegmentSealedImpl::GetMemoryUsageInBytes() const {
int64_t int64_t
SegmentSealedImpl::get_row_count() const { SegmentSealedImpl::get_row_count() const {
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(row_count_opt_.has_value(), "Data not loaded"); return row_count_opt_.value_or(0);
return row_count_opt_.value();
} }
const Schema& const Schema&
...@@ -140,6 +141,16 @@ SegmentSealedImpl::vector_search(int64_t vec_count, ...@@ -140,6 +141,16 @@ SegmentSealedImpl::vector_search(int64_t vec_count,
Assert(vec_indexings_.is_ready(field_offset)); Assert(vec_indexings_.is_ready(field_offset));
query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output);
} }
void
SegmentSealedImpl::DropFieldData(const FieldId field_id) {
std::unique_lock lck(mutex_);
PanicInfo("unimplemented");
}
void
SegmentSealedImpl::DropIndex(const FieldId field_id) {
std::unique_lock lck(mutex_);
PanicInfo("unimplemented");
}
SegmentSealedPtr SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk) { CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk) {
......
...@@ -24,6 +24,10 @@ class SegmentSealedImpl : public SegmentSealed { ...@@ -24,6 +24,10 @@ class SegmentSealedImpl : public SegmentSealed {
LoadIndex(const LoadIndexInfo& info) override; LoadIndex(const LoadIndexInfo& info) override;
void void
LoadFieldData(const LoadFieldDataInfo& info) override; LoadFieldData(const LoadFieldDataInfo& info) override;
void
DropIndex(const FieldId field_id) override;
void
DropFieldData(const FieldId field_id) override;
public: public:
int64_t int64_t
...@@ -106,18 +110,27 @@ class SegmentSealedImpl : public SegmentSealed { ...@@ -106,18 +110,27 @@ class SegmentSealedImpl : public SegmentSealed {
const BitsetView& bitset, const BitsetView& bitset,
QueryResult& output) const override; QueryResult& output) const override;
bool
is_system_field_ready() const {
return system_ready_count_ == 1;
}
bool bool
is_all_ready() const { is_all_ready() const {
// TODO: optimize here // TODO: optimize here
// NOTE: including row_ids // NOTE: including row_ids
return ready_count_ == schema_->size() + 1; if (!is_system_field_ready()) {
return false;
}
return ready_count_ == schema_->size();
} }
mutable std::shared_mutex mutex_;
std::atomic_int ready_count_ = 0;
private: private:
// TOOD: generate index for scalar // segment loading state
std::atomic<int> ready_count_ = 0;
std::atomic<int> system_ready_count_ = 0;
// segment datas
// TODO: generate index for scalar
std::optional<int64_t> row_count_opt_; std::optional<int64_t> row_count_opt_;
std::map<FieldOffset, knowhere::IndexPtr> scalar_indexings_; std::map<FieldOffset, knowhere::IndexPtr> scalar_indexings_;
SealedIndexingRecord vec_indexings_; SealedIndexingRecord vec_indexings_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册