未验证 提交 af573f93 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Add some helper visitors (#2777)

* (db/snapshot): add Segment related visitors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 b7d9c2a4
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SnapshotVisitor.h"
#include <sstream>
#include "db/SnapshotHandlers.h"
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Snapshots.h"
......@@ -38,5 +39,50 @@ SnapshotVisitor::SegmentsToSearch(meta::FilesHolder& files_holder) {
return handler->GetStatus();
}
SegmentFileVisitor::Ptr
SegmentFileVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id) {
if (!ss) {
return nullptr;
}
auto file = ss->GetResource<snapshot::SegmentFile>(segment_file_id);
if (!file) {
return nullptr;
}
auto visitor = std::make_shared<SegmentFileVisitor>();
visitor->SetFile(file);
auto field_element = ss->GetResource<snapshot::FieldElement>(file->GetFieldElementId());
auto field = ss->GetResource<snapshot::Field>(field_element->GetFieldId());
visitor->SetField(field);
visitor->SetFieldElement(field_element);
return visitor;
}
SegmentVisitor::Ptr
SegmentVisitor::Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id) {
if (!ss) {
return nullptr;
}
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (!segment) {
return nullptr;
}
auto visitor = std::make_shared<SegmentVisitor>();
visitor->SetSegment(segment);
auto& file_ids = ss->GetSegmentFileIds(segment_id);
for (auto id : file_ids) {
auto file_visitor = SegmentFileVisitor::Build(ss, id);
if (!file_visitor) {
return nullptr;
}
visitor->InsertSegmentFile(file_visitor);
}
return visitor;
}
} // namespace engine
} // namespace milvus
......@@ -14,7 +14,9 @@
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include <map>
#include <memory>
#include <set>
#include <string>
namespace milvus {
......@@ -34,5 +36,79 @@ class SnapshotVisitor {
Status status_;
};
class SegmentFileVisitor {
public:
using Ptr = std::shared_ptr<SegmentFileVisitor>;
static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_file_id);
SegmentFileVisitor() = default;
const snapshot::SegmentFilePtr
GetFile() const {
return file_;
}
const snapshot::FieldPtr
GetField() const {
return field_;
}
const snapshot::FieldElementPtr
GetFieldElement() const {
return field_element_;
}
void
SetFile(snapshot::SegmentFilePtr file) {
file_ = file;
}
void
SetField(snapshot::FieldPtr field) {
field_ = field;
}
void
SetFieldElement(snapshot::FieldElementPtr field_element) {
field_element_ = field_element;
}
protected:
snapshot::SegmentFilePtr file_;
snapshot::FieldPtr field_;
snapshot::FieldElementPtr field_element_;
};
class SegmentVisitor {
public:
using Ptr = std::shared_ptr<SegmentVisitor>;
using FileT = typename SegmentFileVisitor::Ptr;
using FilesMapT = std::map<snapshot::ID_TYPE, FileT>;
static Ptr
Build(snapshot::ScopedSnapshotT ss, snapshot::ID_TYPE segment_id);
SegmentVisitor() = default;
const FilesMapT&
GetSegmentFiles() const {
return files_map_;
}
const snapshot::SegmentPtr&
GetSegment() const {
return segment_;
}
void
SetSegment(snapshot::SegmentPtr segment) {
segment_ = segment;
}
void
InsertSegmentFile(FileT segment_file) {
files_map_[segment_file->GetFile()->GetID()] = segment_file;
}
protected:
snapshot::SegmentPtr segment_;
FilesMapT files_map_;
};
} // namespace engine
} // namespace milvus
......@@ -33,9 +33,9 @@ namespace engine {
namespace snapshot {
using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
using StepsHolderT = std::tuple<CollectionCommit::SetT, Collection::SetT, SchemaCommit::SetT, FieldCommit::SetT,
Field::SetT, FieldElement::SetT, PartitionCommit::SetT, Partition::SetT,
SegmentCommit::SetT, Segment::SetT, SegmentFile::SetT>;
using StepsHolderT = std::tuple<SegmentFile::SetT, SegmentCommit::SetT, Segment::SetT, PartitionCommit::SetT,
Partition::SetT, FieldElement::SetT, FieldCommit::SetT, Field::SetT, SchemaCommit::SetT,
CollectionCommit::SetT, Collection::SetT>;
enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };
......
......@@ -95,6 +95,7 @@ Snapshot::Snapshot(ID_TYPE ss_id) {
AddResource<FieldElement>(field_element);
AddResource<SegmentFile>(segment_file);
element_segfiles_map_[field_element_id][segment_id] = segment_file_id;
seg_segfiles_map_[segment_id].insert(segment_file_id);
}
}
}
......
......@@ -20,6 +20,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <thread>
......@@ -107,6 +108,15 @@ class Snapshot : public ReferenceProxy {
return GetResources<CollectionCommit>().cbegin()->second.Get();
}
const std::set<ID_TYPE>&
GetSegmentFileIds(ID_TYPE segment_id) const {
auto it = seg_segfiles_map_.find(segment_id);
if (it == seg_segfiles_map_.end()) {
return empty_set_;
}
return it->second;
}
ID_TYPE
GetLatestSchemaCommitId() const {
return latest_schema_commit_id_;
......@@ -325,11 +335,13 @@ class Snapshot : public ReferenceProxy {
std::map<std::string, ID_TYPE> partition_names_map_;
std::map<std::string, std::map<std::string, ID_TYPE>> field_element_names_map_;
std::map<ID_TYPE, std::map<ID_TYPE, ID_TYPE>> element_segfiles_map_;
std::map<ID_TYPE, std::set<ID_TYPE>> seg_segfiles_map_;
std::map<ID_TYPE, ID_TYPE> seg_segc_map_;
std::map<ID_TYPE, ID_TYPE> p_pc_map_;
ID_TYPE latest_schema_commit_id_ = 0;
std::map<ID_TYPE, NUM_TYPE> p_max_seg_num_;
LSN_TYPE max_lsn_;
std::set<ID_TYPE> empty_set_;
};
using GCHandler = std::function<void(Snapshot::Ptr)>;
......
......@@ -18,6 +18,9 @@
#include <algorithm>
#include "ssdb/utils.h"
#include "db/SnapshotVisitor.h"
using SegmentVisitor = milvus::engine::SegmentVisitor;
milvus::Status
CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
......@@ -200,3 +203,61 @@ TEST_F(SSDBTest, IndexTest) {
}
}
}
TEST_F(SSDBTest, VisitorTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
std::stringstream p_name;
auto num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
p_name.str("");
p_name << "partition_" << i;
status = db_->CreatePartition(c1, p_name.str());
ASSERT_TRUE(status.ok());
}
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto new_total = 0;
auto& partitions = ss->GetResources<Partition>();
for (auto& kv : partitions) {
num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok());
}
new_total += num;
}
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
auto executor = [&] (const Segment::Ptr& segment, IterateSegmentHandler* handler) -> Status {
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
if (!visitor) {
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
}
auto& files_map = visitor->GetSegmentFiles();
for (auto& kv : files_map) {
std::cout << "segment " << segment->GetID() << " segment_file_id " << kv.first << std::endl;
std::cout << "element name is " << kv.second->GetFieldElement()->GetName() << std::endl;
}
return Status::OK();
};
auto segment_handler = std::make_shared<SegmentCollector>(ss, executor);
segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok());
}
......@@ -70,6 +70,7 @@ using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentHandler = milvus::engine::snapshot::IterateHandler<Segment>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status;
......@@ -119,6 +120,22 @@ struct PartitionCollector : public IteratePartitionHandler {
std::vector<std::string> partition_names_;
};
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;
explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor)
: BaseT(ss), executor_(executor) {}
Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}
SegmentExecutorT executor_;
};
using FilterT = std::function<bool(SegmentFile::Ptr)>;
struct SegmentFileCollector : public IterateSegmentFileHandler {
using ResourceT = SegmentFile;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册