未验证 提交 9d4febde 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Add some visitors (#2800)

* (db/snapshot): add more visitors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add some Iterators
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update visitors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 5857f640
...@@ -39,23 +39,64 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) { ...@@ -39,23 +39,64 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
return handler->GetStatus(); return handler->GetStatus();
} }
SegmentFileVisitor::Ptr SegmentFieldElementVisitor::Ptr
SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) { SegmentFieldElementVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id,
snapshot::ID_TYPE field_element_id) {
if (!ss) { if (!ss) {
return nullptr; return nullptr;
} }
auto file = ss->GetResource<snapshot::SegmentFile>(segment_file_id); auto element = ss->GetResource<snapshot::FieldElement>(field_element_id);
if (!element) {
return nullptr;
}
auto visitor = std::make_shared<SegmentFieldElementVisitor>();
visitor->SetFieldElement(element);
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (!segment) {
return nullptr;
}
auto file = ss->GetSegmentFile(segment_id, field_element_id);
if (!file) { if (!file) {
return nullptr; return nullptr;
} }
auto visitor = std::make_shared<SegmentFileVisitor>();
visitor->SetFile(file); visitor->SetFile(file);
auto field_element = ss->GetResource<snapshot::FieldElement>(file->GetFieldElementId()); return visitor;
auto field = ss->GetResource<snapshot::Field>(field_element->GetFieldId()); }
SegmentFieldVisitor::Ptr
SegmentFieldVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id) {
if (!ss) {
return nullptr;
}
auto field = ss->GetResource<snapshot::Field>(field_id);
if (!field) {
return nullptr;
}
auto visitor = std::make_shared<SegmentFieldVisitor>();
visitor->SetField(field); visitor->SetField(field);
visitor->SetFieldElement(field_element);
auto executor = [&](const snapshot::FieldElement::Ptr& field_element,
snapshot::FieldElementIterator* itr) -> Status {
if (field_element->GetFieldId() != field_id) {
return Status::OK();
}
auto element_visitor = SegmentFieldElementVisitor::Build(ss, segment_id, field_element->GetID());
if (!element_visitor) {
return Status::OK();
}
visitor->InsertElement(element_visitor);
return Status::OK();
};
auto iterator = std::make_shared<snapshot::FieldElementIterator>(ss, executor);
iterator->Iterate();
return visitor; return visitor;
} }
...@@ -72,17 +113,38 @@ SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id ...@@ -72,17 +113,38 @@ SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id
auto visitor = std::make_shared<SegmentVisitor>(); auto visitor = std::make_shared<SegmentVisitor>();
visitor->SetSegment(segment); visitor->SetSegment(segment);
auto& file_ids = ss->GetSegmentFileIds(segment_id); auto executor = [&](const snapshot::Field::Ptr& field, snapshot::FieldIterator* itr) -> Status {
for (auto id : file_ids) { auto field_visitor = SegmentFieldVisitor::Build(ss, segment_id, field->GetID());
auto file_visitor = SegmentFileVisitor::Build(ss, id); if (!field_visitor) {
if (!file_visitor) { return Status::OK();
return nullptr;
} }
visitor->InsertSegmentFile(file_visitor); visitor->InsertField(field_visitor);
}
return Status::OK();
};
auto iterator = std::make_shared<snapshot::FieldIterator>(ss, executor);
iterator->Iterate();
return visitor; return visitor;
} }
std::string
SegmentVisitor::ToString() const {
std::stringstream ss;
ss << "SegmentVisitor[" << GetSegment()->GetID() << "]: \n";
auto& field_visitors = GetFieldVisitors();
for (auto& fkv : field_visitors) {
ss << " Field[" << fkv.first << "]\n";
auto& fe_visitors = fkv.second->GetElementVistors();
for (auto& fekv : fe_visitors) {
ss << " FieldElement[" << fekv.first << "] ";
ss << "SegmentFile [" << fekv.second->GetFile()->GetID() << "]\n";
}
}
return ss.str();
}
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
...@@ -36,61 +36,106 @@ class SnapshotVisitor { ...@@ -36,61 +36,106 @@ class SnapshotVisitor {
Status status_; Status status_;
}; };
class SegmentFileVisitor { class SegmentFieldElementVisitor {
public: public:
using Ptr = std::shared_ptr<SegmentFileVisitor>; using Ptr = std::shared_ptr<SegmentFieldElementVisitor>;
static Ptr static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id); Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_element_id);
SegmentFileVisitor() = default; SegmentFieldElementVisitor() = default;
void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
}
void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
const snapshot::FieldElementPtr
GetElement() const {
return field_element_;
}
const snapshot::SegmentFilePtr const snapshot::SegmentFilePtr
GetFile() const { GetFile() const {
return file_; return file_;
} }
const snapshot::FieldPtr
protected:
snapshot::FieldElementPtr field_element_;
snapshot::SegmentFilePtr file_;
};
class SegmentFieldVisitor {
public:
using Ptr = std::shared_ptr<SegmentFieldVisitor>;
using ElementT = typename SegmentFieldElementVisitor::Ptr;
using ElementsMapT = std::map<snapshot::ID_TYPE, ElementT>;
static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id, snapshot::ID_TYPE field_id);
SegmentFieldVisitor() = default;
const ElementsMapT&
GetElementVistors() const {
return elements_map_;
}
const snapshot::FieldPtr&
GetField() const { GetField() const {
return field_; return field_;
} }
const snapshot::FieldElementPtr
GetFieldElement() const {
return field_element_;
}
void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
void void
SetField(snapshot::FieldPtr field) { SetField(snapshot::FieldPtr field) {
field_ = field; field_ = field;
} }
void void
SetFieldElement(snapshot::FieldElementPtr field_element) { InsertElement(ElementT element) {
field_element_ = field_element; elements_map_[element->GetElement()->GetID()] = element;
} }
protected: protected:
snapshot::SegmentFilePtr file_; ElementsMapT elements_map_;
snapshot::FieldPtr field_; snapshot::FieldPtr field_;
snapshot::FieldElementPtr field_element_;
}; };
class SegmentVisitor { class SegmentVisitor {
public: public:
using Ptr = std::shared_ptr<SegmentVisitor>; using Ptr = std::shared_ptr<SegmentVisitor>;
using FileT = typename SegmentFileVisitor::Ptr; using FieldVisitorT = typename SegmentFieldVisitor::Ptr;
using FilesMapT = std::map<snapshot::ID_TYPE, FileT>; using IdMapT = std::map<snapshot::ID_TYPE, FieldVisitorT>;
using NameMapT = std::map<std::string, FieldVisitorT>;
static Ptr static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id); Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id);
SegmentVisitor() = default; SegmentVisitor() = default;
const FilesMapT& const IdMapT&
GetSegmentFiles() const { GetFieldVisitors() const {
return files_map_; return id_map_;
} }
FieldVisitorT
GetFieldVisitor(snapshot::ID_TYPE field_id) const {
auto it = id_map_.find(field_id);
if (it == id_map_.end()) {
return nullptr;
}
return it->second;
}
FieldVisitorT
GetFieldVisitor(const std::string& field_name) const {
auto it = name_map_.find(field_name);
if (it == name_map_.end()) {
return nullptr;
}
return it->second;
}
const snapshot::SegmentPtr& const snapshot::SegmentPtr&
GetSegment() const { GetSegment() const {
return segment_; return segment_;
...@@ -101,13 +146,18 @@ class SegmentVisitor { ...@@ -101,13 +146,18 @@ class SegmentVisitor {
segment_ = segment; segment_ = segment;
} }
void void
InsertSegmentFile(FileT segment_file) { InsertField(FieldVisitorT field_visitor) {
files_map_[segment_file->GetFile()->GetID()] = segment_file; id_map_[field_visitor->GetField()->GetID()] = field_visitor;
name_map_[field_visitor->GetField()->GetName()] = field_visitor;
} }
std::string
ToString() const;
protected: protected:
snapshot::SegmentPtr segment_; snapshot::SegmentPtr segment_;
FilesMapT files_map_; IdMapT id_map_;
NameMapT name_map_;
}; };
} // namespace engine } // namespace engine
......
...@@ -26,8 +26,9 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> { ...@@ -26,8 +26,9 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T; using ResourceT = T;
using ThisT = IterateHandler<ResourceT>; using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>; using Ptr = std::shared_ptr<ThisT>;
using ExecutorT = std::function<Status(const typename T::Ptr&, ThisT*)>;
explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) { explicit IterateHandler(ScopedSnapshotT ss, const ExecutorT& executor = {}) : ss_(ss), executor_(executor) {
} }
virtual Status virtual Status
...@@ -35,7 +36,12 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> { ...@@ -35,7 +36,12 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
return Status::OK(); return Status::OK();
} }
virtual Status virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0; Handle(const typename ResourceT::Ptr& resource) {
if (executor_) {
return executor_(resource, this);
}
return Status::OK();
}
virtual Status virtual Status
PostIterate() { PostIterate() {
...@@ -59,26 +65,17 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> { ...@@ -59,26 +65,17 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
} }
ScopedSnapshotT ss_; ScopedSnapshotT ss_;
ExecutorT executor_;
Status status_; Status status_;
mutable std::mutex mtx_; mutable std::mutex mtx_;
}; };
using IterateSegmentHandler = IterateHandler<Segment>; using CollectionIterator = IterateHandler<Collection>;
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>; using PartitionIterator = IterateHandler<Partition>;
struct SegmentCollector : public IterateSegmentHandler { using SegmentIterator = IterateHandler<Segment>;
using ResourceT = Segment; using SegmentFileIterator = IterateHandler<SegmentFile>;
using BaseT = IterateSegmentHandler; using FieldIterator = IterateHandler<Field>;
using FieldElementIterator = IterateHandler<FieldElement>;
explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) : BaseT(ss), executor_(executor) {
}
Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}
SegmentExecutorT executor_;
};
} // namespace snapshot } // namespace snapshot
} // namespace engine } // namespace engine
......
...@@ -164,6 +164,21 @@ Snapshot::GetFieldElement(const std::string& field_name, const std::string& fiel ...@@ -164,6 +164,21 @@ Snapshot::GetFieldElement(const std::string& field_name, const std::string& fiel
return Status::OK(); return Status::OK();
} }
SegmentFilePtr
Snapshot::GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const {
auto it = element_segfiles_map_.find(field_element_id);
if (it == element_segfiles_map_.end()) {
return nullptr;
}
auto its = it->second.find(segment_id);
if (its == it->second.end()) {
return nullptr;
}
return GetResource<SegmentFile>(its->second);
}
const std::string const std::string
Snapshot::ToString() const { Snapshot::ToString() const {
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string { auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
......
...@@ -117,6 +117,9 @@ class Snapshot : public ReferenceProxy { ...@@ -117,6 +117,9 @@ class Snapshot : public ReferenceProxy {
return it->second; return it->second;
} }
SegmentFilePtr
GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const;
ID_TYPE ID_TYPE
GetLatestSchemaCommitId() const { GetLatestSchemaCommitId() const {
return latest_schema_commit_id_; return latest_schema_commit_id_;
......
...@@ -244,20 +244,16 @@ TEST_F(SSDBTest, VisitorTest) { ...@@ -244,20 +244,16 @@ TEST_F(SSDBTest, VisitorTest) {
status = Snapshots::GetInstance().GetSnapshot(ss, c1); status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status { auto executor = [&] (const Segment::Ptr& segment, SegmentIterator* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID()); auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) { if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor"); return Status(milvus::SS_ERROR, "Cannot build segment visitor");
} }
auto& files_map = visitor->GetSegmentFiles(); std::cout << visitor->ToString() << std::endl;
for (auto& kv : files_map) {
std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl;
std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl;
}
return Status::OK(); return Status::OK();
}; };
auto segment_handler = std::make_shared<milvus::engine::snapshot::SegmentCollector>(ss, executor); auto segment_handler = std::make_shared<SegmentIterator>(ss, executor);
segment_handler->Iterate(); segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl; std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok()); ASSERT_TRUE(segment_handler->GetStatus().ok());
......
...@@ -70,8 +70,9 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>; ...@@ -70,8 +70,9 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>; using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField; using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>; using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler<Segment>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>; using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using PartitionIterator = milvus::engine::snapshot::PartitionIterator;
using SegmentIterator = milvus::engine::snapshot::SegmentIterator;
using SSDBImpl = milvus::engine::SSDBImpl; using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status; using Status = milvus::Status;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册