From 9d4febde7ff8a8b263d022eec69d1a21066b605e Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Fri, 10 Jul 2020 16:49:00 +0800 Subject: [PATCH] (db/snapshot): Add some visitors (#2800) * (db/snapshot): add more visitors Signed-off-by: peng.xu * (db/snapshot): fix lint error Signed-off-by: peng.xu * (db/snapshot): add some Iterators Signed-off-by: peng.xu * (db/snapshot): update visitors Signed-off-by: peng.xu --- core/src/db/SnapshotVisitor.cpp | 90 +++++++++++++++++++---- core/src/db/SnapshotVisitor.h | 100 +++++++++++++++++++------- core/src/db/snapshot/IterateHandler.h | 33 ++++----- core/src/db/snapshot/Snapshot.cpp | 15 ++++ core/src/db/snapshot/Snapshot.h | 3 + core/unittest/ssdb/test_db.cpp | 10 +-- core/unittest/ssdb/utils.h | 3 +- 7 files changed, 189 insertions(+), 65 deletions(-) diff --git a/core/src/db/SnapshotVisitor.cpp b/core/src/db/SnapshotVisitor.cpp index 4000523a..2a71b3ba 100644 --- a/core/src/db/SnapshotVisitor.cpp +++ b/core/src/db/SnapshotVisitor.cpp @@ -39,23 +39,64 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) { return handler->GetStatus(); } -SegmentFileVisitor::Ptr -SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) { +SegmentFieldElementVisitor::Ptr +SegmentFieldElementVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, + snapshot::ID_TYPE field_element_id) { if (!ss) { return nullptr; } - auto file = ss->GetResource(segment_file_id); + auto element = ss->GetResource(field_element_id); + if (!element) { + return nullptr; + } + + auto visitor = std::make_shared(); + visitor->SetFieldElement(element); + auto segment = ss->GetResource(segment_id); + if (!segment) { + return nullptr; + } + + auto file = ss->GetSegmentFile(segment_id, field_element_id); if (!file) { return nullptr; } - auto visitor = std::make_shared(); visitor->SetFile(file); - auto field_element = ss->GetResource(file->GetFieldElementId()); - auto field = ss->GetResource(field_element->GetFieldId()); + return visitor; +} + +SegmentFieldVisitor::Ptr +SegmentFieldVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id) { + if (!ss) { + return nullptr; + } + + auto field = ss->GetResource(field_id); + if (!field) { + return nullptr; + } + + auto visitor = std::make_shared(); visitor->SetField(field); - visitor->SetFieldElement(field_element); + + auto executor = [&](const snapshot::FieldElement::Ptr& field_element, + snapshot::FieldElementIterator* itr) -> Status { + if (field_element->GetFieldId() != field_id) { + return Status::OK(); + } + auto element_visitor = SegmentFieldElementVisitor::Build(ss, segment_id, field_element->GetID()); + if (!element_visitor) { + return Status::OK(); + } + visitor->InsertElement(element_visitor); + return Status::OK(); + }; + + auto iterator = std::make_shared(ss, executor); + iterator->Iterate(); + return visitor; } @@ -72,17 +113,38 @@ SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id auto visitor = std::make_shared(); visitor->SetSegment(segment); - auto& file_ids = ss->GetSegmentFileIds(segment_id); - for (auto id : file_ids) { - auto file_visitor = SegmentFileVisitor::Build(ss, id); - if (!file_visitor) { - return nullptr; + auto executor = [&](const snapshot::Field::Ptr& field, snapshot::FieldIterator* itr) -> Status { + auto field_visitor = SegmentFieldVisitor::Build(ss, segment_id, field->GetID()); + if (!field_visitor) { + return Status::OK(); } - visitor->InsertSegmentFile(file_visitor); - } + visitor->InsertField(field_visitor); + + return Status::OK(); + }; + + auto iterator = std::make_shared(ss, executor); + iterator->Iterate(); return visitor; } +std::string +SegmentVisitor::ToString() const { + std::stringstream ss; + ss << "SegmentVisitor[" << GetSegment()->GetID() << "]: \n"; + auto& field_visitors = GetFieldVisitors(); + for (auto& fkv : field_visitors) { + ss << " Field[" << fkv.first << "]\n"; + auto& fe_visitors = fkv.second->GetElementVistors(); + for (auto& fekv : fe_visitors) { + ss << " FieldElement[" << fekv.first << "] "; + ss << "SegmentFile [" << fekv.second->GetFile()->GetID() << "]\n"; + } + } + + return ss.str(); +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotVisitor.h b/core/src/db/SnapshotVisitor.h index 48cd3df5..e0f3816d 100644 --- a/core/src/db/SnapshotVisitor.h +++ b/core/src/db/SnapshotVisitor.h @@ -36,61 +36,106 @@ class SnapshotVisitor { Status status_; }; -class SegmentFileVisitor { +class SegmentFieldElementVisitor { public: - using Ptr = std::shared_ptr; + using Ptr = std::shared_ptr; static Ptr - Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id); + Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_element_id); - SegmentFileVisitor() = default; + SegmentFieldElementVisitor() = default; + void + SetFieldElement(snapshot::FieldElementPtr field_element) { + field_element_ = field_element; + } + void + SetFile(snapshot::SegmentFilePtr file) { + file_ = file; + } + + const snapshot::FieldElementPtr + GetElement() const { + return field_element_; + } const snapshot::SegmentFilePtr GetFile() const { return file_; } - const snapshot::FieldPtr + + protected: + snapshot::FieldElementPtr field_element_; + snapshot::SegmentFilePtr file_; +}; + +class SegmentFieldVisitor { + public: + using Ptr = std::shared_ptr; + using ElementT = typename SegmentFieldElementVisitor::Ptr; + using ElementsMapT = std::map; + + static Ptr + Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id); + + SegmentFieldVisitor() = default; + + const ElementsMapT& + GetElementVistors() const { + return elements_map_; + } + const snapshot::FieldPtr& GetField() const { return field_; } - const snapshot::FieldElementPtr - GetFieldElement() const { - return field_element_; - } - void - SetFile(snapshot::SegmentFilePtr file) { - file_ = file; - } void SetField(snapshot::FieldPtr field) { field_ = field; } + void - SetFieldElement(snapshot::FieldElementPtr field_element) { - field_element_ = field_element; + InsertElement(ElementT element) { + elements_map_[element->GetElement()->GetID()] = element; } protected: - snapshot::SegmentFilePtr file_; + ElementsMapT elements_map_; snapshot::FieldPtr field_; - snapshot::FieldElementPtr field_element_; }; class SegmentVisitor { public: using Ptr = std::shared_ptr; - using FileT = typename SegmentFileVisitor::Ptr; - using FilesMapT = std::map; + using FieldVisitorT = typename SegmentFieldVisitor::Ptr; + using IdMapT = std::map; + using NameMapT = std::map; static Ptr Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id); SegmentVisitor() = default; - const FilesMapT& - GetSegmentFiles() const { - return files_map_; + const IdMapT& + GetFieldVisitors() const { + return id_map_; } + + FieldVisitorT + GetFieldVisitor(snapshot::ID_TYPE field_id) const { + auto it = id_map_.find(field_id); + if (it == id_map_.end()) { + return nullptr; + } + return it->second; + } + FieldVisitorT + GetFieldVisitor(const std::string& field_name) const { + auto it = name_map_.find(field_name); + if (it == name_map_.end()) { + return nullptr; + } + return it->second; + } + const snapshot::SegmentPtr& GetSegment() const { return segment_; @@ -101,13 +146,18 @@ class SegmentVisitor { segment_ = segment; } void - InsertSegmentFile(FileT segment_file) { - files_map_[segment_file->GetFile()->GetID()] = segment_file; + InsertField(FieldVisitorT field_visitor) { + id_map_[field_visitor->GetField()->GetID()] = field_visitor; + name_map_[field_visitor->GetField()->GetName()] = field_visitor; } + std::string + ToString() const; + protected: snapshot::SegmentPtr segment_; - FilesMapT files_map_; + IdMapT id_map_; + NameMapT name_map_; }; } // namespace engine diff --git a/core/src/db/snapshot/IterateHandler.h b/core/src/db/snapshot/IterateHandler.h index ed39fd10..034b3299 100644 --- a/core/src/db/snapshot/IterateHandler.h +++ b/core/src/db/snapshot/IterateHandler.h @@ -26,8 +26,9 @@ struct IterateHandler : public std::enable_shared_from_this> { using ResourceT = T; using ThisT = IterateHandler; using Ptr = std::shared_ptr; + using ExecutorT = std::function; - explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) { + explicit IterateHandler(ScopedSnapshotT ss, const ExecutorT& executor = {}) : ss_(ss), executor_(executor) { } virtual Status @@ -35,7 +36,12 @@ struct IterateHandler : public std::enable_shared_from_this> { return Status::OK(); } virtual Status - Handle(const typename ResourceT::Ptr& resource) = 0; + Handle(const typename ResourceT::Ptr& resource) { + if (executor_) { + return executor_(resource, this); + } + return Status::OK(); + } virtual Status PostIterate() { @@ -59,26 +65,17 @@ struct IterateHandler : public std::enable_shared_from_this> { } ScopedSnapshotT ss_; + ExecutorT executor_; Status status_; mutable std::mutex mtx_; }; -using IterateSegmentHandler = IterateHandler; -using SegmentExecutorT = std::function; -struct SegmentCollector : public IterateSegmentHandler { - using ResourceT = Segment; - using BaseT = IterateSegmentHandler; - - explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) : BaseT(ss), executor_(executor) { - } - - Status - Handle(const typename ResourceT::Ptr& segment) override { - return executor_(segment, this); - } - - SegmentExecutorT executor_; -}; +using CollectionIterator = IterateHandler; +using PartitionIterator = IterateHandler; +using SegmentIterator = IterateHandler; +using SegmentFileIterator = IterateHandler; +using FieldIterator = IterateHandler; +using FieldElementIterator = IterateHandler; } // namespace snapshot } // namespace engine diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index c1716d1f..39a8dbeb 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -164,6 +164,21 @@ Snapshot::GetFieldElement(const std::string& field_name, const std::string& fiel return Status::OK(); } +SegmentFilePtr +Snapshot::GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const { + auto it = element_segfiles_map_.find(field_element_id); + if (it == element_segfiles_map_.end()) { + return nullptr; + } + + auto its = it->second.find(segment_id); + if (its == it->second.end()) { + return nullptr; + } + + return GetResource(its->second); +} + const std::string Snapshot::ToString() const { auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string { diff --git a/core/src/db/snapshot/Snapshot.h b/core/src/db/snapshot/Snapshot.h index a28884f4..0c75e065 100644 --- a/core/src/db/snapshot/Snapshot.h +++ b/core/src/db/snapshot/Snapshot.h @@ -117,6 +117,9 @@ class Snapshot : public ReferenceProxy { return it->second; } + SegmentFilePtr + GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const; + ID_TYPE GetLatestSchemaCommitId() const { return latest_schema_commit_id_; diff --git a/core/unittest/ssdb/test_db.cpp b/core/unittest/ssdb/test_db.cpp index 929b915e..80ff33c3 100644 --- a/core/unittest/ssdb/test_db.cpp +++ b/core/unittest/ssdb/test_db.cpp @@ -244,20 +244,16 @@ TEST_F(SSDBTest, VisitorTest) { status = Snapshots::GetInstance().GetSnapshot(ss, c1); ASSERT_TRUE(status.ok()); - auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status { + auto executor = [&] (const Segment::Ptr& segment, SegmentIterator* handler) -> Status { auto visitor = SegmentVisitor::Build(ss, segment->GetID()); if (!visitor) { return Status(milvus::SS_ERROR, "Cannot build segment visitor"); } - auto& files_map = visitor->GetSegmentFiles(); - for (auto& kv : files_map) { - std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl; - std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl; - } + std::cout << visitor->ToString() << std::endl; return Status::OK(); }; - auto segment_handler = std::make_shared(ss, executor); + auto segment_handler = std::make_shared(ss, executor); segment_handler->Iterate(); std::cout << segment_handler->GetStatus().ToString() << std::endl; ASSERT_TRUE(segment_handler->GetStatus().ok()); diff --git a/core/unittest/ssdb/utils.h b/core/unittest/ssdb/utils.h index 3bdc4f0b..62de725c 100644 --- a/core/unittest/ssdb/utils.h +++ b/core/unittest/ssdb/utils.h @@ -70,8 +70,9 @@ using TQueue = milvus::BlockingQueue>; using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; using ParamsField = milvus::engine::snapshot::ParamsField; using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler; -using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler; using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler; +using PartitionIterator = milvus::engine::snapshot::PartitionIterator; +using SegmentIterator = milvus::engine::snapshot::SegmentIterator; using SSDBImpl = milvus::engine::SSDBImpl; using Status = milvus::Status; -- GitLab