未验证 提交 aa31b0c2 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): fix bugs and enhance unit test (#2562)

* (db/snapshot): add ToString for Snapshot
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bugs and enhance CompoundTest1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): some code refactor
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 d5d24f3d
......@@ -29,14 +29,14 @@ BuildOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
SegmentCommitOperation op(context_, context_.prev_ss);
SegmentCommitOperation op(context_, GetAdjustedSS());
op(store);
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
PartitionCommitOperation pc_op(context_, context_.prev_ss);
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
pc_op(store);
OperationContext cc_context;
......@@ -52,7 +52,7 @@ BuildOperation::DoExecute(Store& store) {
return status;
AddStepWithLsn(*context_.new_partition_commit, context_.lsn);
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
cc_op(store);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
......@@ -77,7 +77,13 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id));
if (!status.ok())
return status;
auto new_sf_op = std::make_shared<SegmentFileOperation>(context, prev_ss_);
auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
if (!segment) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid segment_id in context");
}
auto ctx = context;
ctx.partition_id = segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
status = new_sf_op->Push();
if (!status.ok())
return status;
......@@ -101,7 +107,7 @@ NewSegmentOperation::DoExecute(Store& store) {
/* auto status = PrevSnapshotRequried(); */
/* if (!status.ok()) return status; */
// TODO: Check Context
SegmentCommitOperation op(context_, context_.prev_ss);
SegmentCommitOperation op(context_, GetAdjustedSS());
auto status = op(store);
if (!status.ok())
return status;
......@@ -117,7 +123,7 @@ NewSegmentOperation::DoExecute(Store& store) {
OperationContext cc_context;
PartitionCommitOperation pc_op(context_, context_.prev_ss);
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
......@@ -132,7 +138,7 @@ NewSegmentOperation::DoExecute(Store& store) {
/* } */
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
......@@ -146,7 +152,7 @@ NewSegmentOperation::DoExecute(Store& store) {
Status
NewSegmentOperation::CommitNewSegment(SegmentPtr& created) {
auto op = std::make_shared<SegmentOperation>(context_, prev_ss_);
auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
auto status = op->Push();
if (!status.ok())
return status;
......@@ -163,7 +169,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
auto c = context;
c.segment_id = context_.new_segment->GetID();
c.partition_id = context_.new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, prev_ss_);
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, GetStartedSS());
auto status = new_sf_op->Push();
if (!status.ok())
return status;
......@@ -178,6 +184,18 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
Status
MergeOperation::OnSnapshotStale() {
for (auto& stale_seg : context_.stale_segments) {
auto expect_sc = GetStartedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
auto latest_sc = GetAdjustedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
if (!latest_sc || (latest_sc->GetID() != expect_sc->GetID())) {
return Status(SS_STALE_ERROR, "MergeOperation on stale segments");
}
}
return Status::OK();
}
Status
MergeOperation::CommitNewSegment(SegmentPtr& created) {
Status status;
......@@ -185,7 +203,7 @@ MergeOperation::CommitNewSegment(SegmentPtr& created) {
created = context_.new_segment;
return status;
}
auto op = std::make_shared<SegmentOperation>(context_, prev_ss_);
auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
status = op->Push();
if (!status.ok())
return status;
......@@ -207,7 +225,7 @@ MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
auto c = context;
c.segment_id = new_segment->GetID();
c.partition_id = new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, prev_ss_);
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, GetStartedSS());
status = new_sf_op->Push();
if (!status.ok())
return status;
......@@ -224,7 +242,7 @@ MergeOperation::DoExecute(Store& store) {
// PXU TODO:
// 1. Check all requried field elements have related segment files
// 2. Check Stale and others
SegmentCommitOperation op(context_, context_.prev_ss);
SegmentCommitOperation op(context_, GetAdjustedSS());
auto status = op(store);
if (!status.ok())
return status;
......@@ -239,7 +257,7 @@ MergeOperation::DoExecute(Store& store) {
/* } */
/* std::cout << ")" << std::endl; */
PartitionCommitOperation pc_op(context_, context_.prev_ss);
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
......@@ -257,7 +275,7 @@ MergeOperation::DoExecute(Store& store) {
/* } */
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
......@@ -309,8 +327,8 @@ std::string
DropPartitionOperation::GetRepr() const {
std::stringstream ss;
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_->GetID();
if (GetAdjustedSS()) {
ss << "SS=" << GetAdjustedSS()->GetID();
}
ss << "," << c_context_.ToString();
ss << "," << context_.ToString();
......@@ -325,19 +343,19 @@ DropPartitionOperation::DoExecute(Store& store) {
PartitionPtr p;
auto id = c_context_.id;
if (id == 0) {
status = prev_ss_->GetPartitionId(c_context_.name, id);
status = GetAdjustedSS()->GetPartitionId(c_context_.name, id);
c_context_.id = id;
}
if (!status.ok())
return status;
auto p_c = prev_ss_->GetPartitionCommitByPartitionId(id);
auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id);
if (!p_c)
return Status(SS_NOT_FOUND_ERROR, "No partition commit found");
context_.stale_partition_commit = p_c;
OperationContext op_ctx;
op_ctx.stale_partition_commit = p_c;
auto op = CollectionCommitOperation(op_ctx, prev_ss_);
auto op = CollectionCommitOperation(op_ctx, GetAdjustedSS());
status = op(store);
if (!status.ok())
return status;
......@@ -368,7 +386,7 @@ CreatePartitionOperation::PreCheck() {
Status
CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, PartitionPtr& partition) {
Status status;
auto op = std::make_shared<PartitionOperation>(context, prev_ss_);
auto op = std::make_shared<PartitionOperation>(context, GetStartedSS());
status = op->Push();
if (!status.ok())
return status;
......@@ -387,13 +405,13 @@ CreatePartitionOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
auto collection = prev_ss_->GetCollection();
auto collection = GetAdjustedSS()->GetCollection();
auto partition = context_.new_partition;
PartitionCommitPtr pc;
OperationContext pc_context;
pc_context.new_partition = partition;
auto pc_op = PartitionCommitOperation(pc_context, prev_ss_);
auto pc_op = PartitionCommitOperation(pc_context, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
......@@ -404,7 +422,7 @@ CreatePartitionOperation::DoExecute(Store& store) {
OperationContext cc_context;
cc_context.new_partition_commit = pc;
context_.new_partition_commit = pc;
auto cc_op = CollectionCommitOperation(cc_context, prev_ss_);
auto cc_op = CollectionCommitOperation(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
......@@ -432,8 +450,8 @@ std::string
CreateCollectionOperation::GetRepr() const {
std::stringstream ss;
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_->GetID();
if (GetAdjustedSS()) {
ss << "SS=" << GetAdjustedSS()->GetID();
}
ss << c_context_.ToString();
ss << "," << context_.ToString();
......
......@@ -32,8 +32,8 @@ class CompoundBaseOperation : public Operations {
GetRepr() const override {
std::stringstream ss;
ss << "<" << GetName() << "(";
if (context_.prev_ss) {
ss << "SS=" << context_.prev_ss->GetID();
if (GetAdjustedSS()) {
ss << "SS=" << GetAdjustedSS()->GetID();
}
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
......@@ -43,7 +43,7 @@ class CompoundBaseOperation : public Operations {
Status
PreCheck() override {
if (GetContextLsn() <= prev_ss_->GetMaxLsn()) {
if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
......@@ -104,6 +104,9 @@ class MergeOperation : public CompoundBaseOperation<MergeOperation> {
CommitNewSegment(SegmentPtr&);
Status
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr&);
Status
OnSnapshotStale() override;
};
class CreateCollectionOperation : public CompoundBaseOperation<CreateCollectionOperation> {
......
......@@ -225,8 +225,8 @@ Operations::OnExecute(Store& store) {
Status
Operations::PreExecute(Store& store) {
Status status;
if (prev_ss_ && type_ == OperationsType::W_Compound) {
Snapshots::GetInstance().GetSnapshot(context_.prev_ss, prev_ss_->GetCollectionId());
if (GetStartedSS() && type_ == OperationsType::W_Compound) {
Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId());
if (!context_.prev_ss) {
status = OnSnapshotDropped();
} else if (prev_ss_->GetID() != context_.prev_ss->GetID()) {
......
......@@ -41,10 +41,15 @@ class Operations : public std::enable_shared_from_this<Operations> {
const OperationsType& type = OperationsType::Invalid);
const ScopedSnapshotT&
GetPrevSnapshot() const {
GetStartedSS() const {
return prev_ss_;
}
const ScopedSnapshotT&
GetAdjustedSS() const {
return context_.prev_ss;
}
virtual const LSN_TYPE&
GetContextLsn() const {
return context_.lsn;
......@@ -201,6 +206,8 @@ class CommitOperation : public Operations {
Status
GetResource(typename ResourceT::Ptr& res, bool wait = false) {
if (!status_.ok())
return status_;
if (wait) {
WaitToFinish();
}
......@@ -248,6 +255,8 @@ class LoadOperation : public Operations {
Status
GetResource(typename ResourceT::Ptr& res, bool wait = false) {
if (!status_.ok())
return status_;
if (wait) {
WaitToFinish();
}
......
......@@ -27,7 +27,7 @@ CollectionCommitOperation::DoExecute(Store& store) {
resource_->GetMappings().erase(context_.stale_partition_commit->GetID());
} else if (context_.new_partition_commit) {
auto prev_partition_commit =
prev_ss_->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId());
GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_partition_commit->GetPartitionId());
if (prev_partition_commit)
resource_->GetMappings().erase(prev_partition_commit->GetID());
resource_->GetMappings().insert(context_.new_partition_commit->GetID());
......@@ -53,7 +53,7 @@ PartitionOperation::DoExecute(Store& store) {
auto status = CheckStale();
if (!status.ok())
return status;
resource_ = std::make_shared<Partition>(context_.name, prev_ss_->GetCollection()->GetID());
resource_ = std::make_shared<Partition>(context_.name, GetStartedSS()->GetCollection()->GetID());
AddStep(*resource_, false);
return status;
}
......@@ -72,7 +72,7 @@ PartitionCommitOperation::GetPrevResource() const {
auto& segment_commit = context_.new_segment_commit;
if (!segment_commit)
return nullptr;
return prev_ss_->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId());
return GetStartedSS()->GetPartitionCommitByPartitionId(segment_commit->GetPartitionId());
}
Status
......@@ -82,12 +82,16 @@ PartitionCommitOperation::DoExecute(Store& store) {
resource_ = std::make_shared<PartitionCommit>(*prev_resource);
resource_->SetID(0);
resource_->ResetStatus();
auto prev_segment_commit = prev_ss_->GetSegmentCommit(context_.new_segment_commit->GetSegmentId());
auto prev_segment_commit =
GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_commit->GetSegmentId());
if (prev_segment_commit)
resource_->GetMappings().erase(prev_segment_commit->GetID());
if (context_.stale_segments.size() > 0) {
for (auto& stale_segment : context_.stale_segments) {
auto stale_segment_commit = prev_ss_->GetSegmentCommit(stale_segment->GetID());
if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) {
return Status(SS_INVALID_CONTEX_ERROR, "All stale segments should from specified partition");
}
auto stale_segment_commit = GetStartedSS()->GetSegmentCommitBySegmentId(stale_segment->GetID());
resource_->GetMappings().erase(stale_segment_commit->GetID());
}
}
......@@ -95,7 +99,8 @@ PartitionCommitOperation::DoExecute(Store& store) {
if (!context_.new_partition) {
return Status(SS_INVALID_CONTEX_ERROR, "Partition is required");
}
resource_ = std::make_shared<PartitionCommit>(prev_ss_->GetCollectionId(), context_.new_partition->GetID());
resource_ =
std::make_shared<PartitionCommit>(GetStartedSS()->GetCollectionId(), context_.new_partition->GetID());
}
if (context_.new_segment_commit) {
......@@ -112,7 +117,7 @@ SegmentCommitOperation::SegmentCommitOperation(const OperationContext& context,
SegmentCommit::Ptr
SegmentCommitOperation::GetPrevResource() const {
if (context_.new_segment_files.size() > 0) {
return prev_ss_->GetSegmentCommit(context_.new_segment_files[0]->GetSegmentId());
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId());
}
return nullptr;
}
......@@ -132,7 +137,7 @@ SegmentOperation::DoExecute(Store& store) {
if (!context_.prev_partition) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context");
}
auto prev_num = prev_ss_->GetMaxSegmentNumByPartition(context_.prev_partition->GetID());
auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID());
resource_ = std::make_shared<Segment>(context_.prev_partition->GetID(), prev_num + 1);
AddStep(*resource_, false);
return Status::OK();
......@@ -150,7 +155,7 @@ SegmentCommitOperation::DoExecute(Store& store) {
resource_->GetMappings().erase(context_.stale_segment_file->GetID());
}
} else {
resource_ = std::make_shared<SegmentCommit>(prev_ss_->GetLatestSchemaCommitId(),
resource_ = std::make_shared<SegmentCommit>(GetStartedSS()->GetLatestSchemaCommitId(),
context_.new_segment_files[0]->GetPartitionId(),
context_.new_segment_files[0]->GetSegmentId());
}
......@@ -175,7 +180,7 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS
Status
SegmentFileOperation::DoExecute(Store& store) {
auto field_element_id = prev_ss_->GetFieldElementId(context_.field_name, context_.field_element_name);
auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name);
resource_ = std::make_shared<SegmentFile>(context_.partition_id, context_.segment_id, field_element_id);
AddStep(*resource_, false);
return Status::OK();
......
......@@ -118,18 +118,77 @@ Snapshot::Snapshot(ID_TYPE id) {
}
}
/* for(auto kv : partition_commits_) { */
/* std::cout << this << " Snapshot " << collection_commit_->GetID() << " PartitionCommit " << */
/* kv.first << " Partition " << kv.second->GetPartitionId() << std::endl; */
/* } */
/* for(auto kv : p_pc_map_) { */
/* std::cout << this << " Snapshot " << collection_commit_->GetID() << " P " << */
/* kv.first << " PC " << kv.second << std::endl; */
/* } */
RefAll();
}
const std::string
Snapshot::ToString() {
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
std::stringstream ss;
std::string l1_spaces;
for (auto i = 0; i < ident; ++i) {
l1_spaces += " ";
}
auto l2_spaces = l1_spaces + l1_spaces;
std::string prefix = "";
if (mappings.size() > line_length) {
prefix = "\n" + l1_spaces;
}
ss << prefix << "[";
auto pos = 0;
for (auto id : mappings) {
if (pos > line_length) {
pos = 0;
ss << "\n" << l2_spaces;
} else if (pos == 0) {
if (prefix != "") {
ss << "\n" << l2_spaces;
}
} else {
ss << ", ";
}
ss << id;
pos++;
}
ss << prefix << "]";
return ss.str();
};
int row_element_size = 8;
std::stringstream ss;
ss << "****************************** Snapshot " << GetID() << " ******************************";
ss << "\nCollection: id=" << GetCollectionId() << ",name=\"" << GetName() << "\"";
ss << ", CollectionCommit: id=" << GetCollectionCommit()->GetID();
ss << ",mappings=";
auto& cc_m = GetCollectionCommit()->GetMappings();
ss << to_matrix_string(cc_m, row_element_size, 2);
for (auto& p_c_id : cc_m) {
auto p_c = GetResource<PartitionCommit>(p_c_id);
auto p = GetResource<Partition>(p_c->GetPartitionId());
ss << "\nPartition: id=" << p->GetID() << ",name=\"" << p->GetName() << "\"";
ss << ", PartitionCommit: id=" << p_c->GetID();
ss << ",mappings=";
auto& pc_m = p_c->GetMappings();
ss << to_matrix_string(pc_m, row_element_size, 2);
for (auto& sc_id : pc_m) {
auto sc = GetResource<SegmentCommit>(sc_id);
auto se = GetResource<Segment>(sc->GetSegmentId());
ss << "\n Segment: id=" << se->GetID();
ss << ", SegmentCommit: id=" << sc->GetID();
ss << ",mappings=";
auto& sc_m = sc->GetMappings();
ss << to_matrix_string(sc_m, row_element_size, 2);
for (auto& sf_id : sc_m) {
auto sf = GetResource<SegmentFile>(sf_id);
ss << "\n\tSegmentFile: id=" << sf_id << ",field_element_id=" << sf->GetFieldElementId();
}
}
}
ss << "\n----------------------------------------------------------------------------------------";
return ss.str();
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -82,6 +82,16 @@ class Snapshot : public ReferenceProxy {
return max_lsn_;
}
PartitionPtr
GetPartition(const std::string& name) {
ID_TYPE id;
auto status = GetPartitionId(name, id);
if (!status.ok()) {
return nullptr;
}
return GetResource<Partition>(id);
}
Status
GetPartitionId(const std::string& name, ID_TYPE& id) const {
auto it = partition_names_map_.find(name);
......@@ -104,7 +114,7 @@ class Snapshot : public ReferenceProxy {
// PXU TODO: add const. Need to change Scopedxxxx::Get
SegmentCommitPtr
GetSegmentCommit(ID_TYPE segment_id) {
GetSegmentCommitBySegmentId(ID_TYPE segment_id) {
auto it = seg_segc_map_.find(segment_id);
if (it == seg_segc_map_.end())
return nullptr;
......@@ -246,6 +256,9 @@ class Snapshot : public ReferenceProxy {
resources[resource->GetID()] = resource;
}
const std::string
ToString();
private:
Snapshot(const Snapshot&) = delete;
Snapshot&
......
......@@ -16,6 +16,7 @@
#include <random>
#include <string>
#include <set>
#include <algorithm>
#include "db/utils.h"
#include "db/snapshot/ReferenceProxy.h"
......@@ -186,7 +187,6 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
op->Push();
ScopedSnapshotT ss;
auto status = op->GetSnapshot(ss);
std::cout << status.ToString() << std::endl;
return ss;
}
......@@ -234,7 +234,6 @@ TEST_F(SnapshotTest, DropCollectionTest) {
ASSERT_TRUE(ss);
ScopedSnapshotT lss;
auto status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
ASSERT_TRUE(lss);
ASSERT_EQ(ss->GetID(), lss->GetID());
......@@ -316,6 +315,7 @@ CreatePartition(const std::string& collection_name, const PartitionContext& p_co
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
......@@ -326,15 +326,21 @@ CreatePartition(const std::string& collection_name, const PartitionContext& p_co
PartitionPtr partition;
status = op->CommitNewPartition(p_context, partition);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->GetSnapshot(curr_ss);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
return curr_ss;
}
......@@ -387,8 +393,8 @@ TEST_F(SnapshotTest, PartitionTest) {
p_ctx.lsn = ++lsn;
drop_op = std::make_shared<DropPartitionOperation>(p_ctx, latest_ss);
status = drop_op->Push();
ASSERT_TRUE(!status.ok());
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(!status.ok());
PartitionContext pp_ctx;
pp_ctx.name = "p2";
......@@ -495,7 +501,7 @@ TEST_F(SnapshotTest, OperationTest) {
status = build_op->CommitNewSegmentFile(sf_context, seg_file);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(seg_file);
auto prev_segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId());
auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto prev_segment_commit_mappings = prev_segment_commit->GetMappings();
ASSERT_NE(prev_segment_commit->ToString(), "");
......@@ -503,7 +509,7 @@ TEST_F(SnapshotTest, OperationTest) {
status = build_op->GetSnapshot(ss);
ASSERT_TRUE(ss->GetID() > ss_id);
auto segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId());
auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto segment_commit_mappings = segment_commit->GetMappings();
MappingT expected_mappings = prev_segment_commit_mappings;
expected_mappings.insert(seg_file->GetID());
......@@ -515,6 +521,7 @@ TEST_F(SnapshotTest, OperationTest) {
stale_segment_commit_ids.insert(segment_commit->GetID());
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// Check stale snapshot has been deleted from store
{
auto collection_commit = CollectionCommitsHolder::GetInstance()
......@@ -543,7 +550,7 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(ss->GetID() > ss_id);
ASSERT_TRUE(status.ok());
auto segment_commit = ss->GetSegmentCommit(seg_file->GetSegmentId());
auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto segment_commit_mappings = segment_commit->GetMappings();
MappingT expected_segment_mappings;
expected_segment_mappings.insert(seg_file->GetID());
......@@ -579,7 +586,7 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(ss->GetID() > ss_id);
ASSERT_TRUE(status.ok());
auto segment_commit = ss->GetSegmentCommit(new_seg->GetID());
auto segment_commit = ss->GetSegmentCommitBySegmentId(new_seg->GetID());
auto new_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
auto new_mappings = new_partition_commit->GetMappings();
auto prev_mappings = prev_partition_commit->GetMappings();
......@@ -628,6 +635,7 @@ TEST_F(SnapshotTest, OperationTest) {
++lsn);
ASSERT_TRUE(status.ok());
status = build_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(!status.ok());
ASSERT_TRUE(!(build_op->GetStatus()).ok());
std::cout << build_op->ToString() << std::endl;
......@@ -665,6 +673,10 @@ TEST_F(SnapshotTest, CompoundTest1) {
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
LSN_TYPE pid = 0;
auto next_pid = [&]() -> decltype(pid) {
return ++pid;
};
std::string collection_name("c1");
auto ss = CreateCollection(collection_name, next_lsn());
ASSERT_TRUE(ss);
......@@ -675,12 +687,15 @@ TEST_F(SnapshotTest, CompoundTest1) {
std::set<ID_TYPE> all_segments;
std::set<ID_TYPE> segment_in_building;
std::set<ID_TYPE> merge_segs;
std::map<ID_TYPE, std::set<ID_TYPE>> merged_segs;
std::map<ID_TYPE, std::set<ID_TYPE>> merged_segs_history;
std::set<ID_TYPE> merged_segs;
std::set<ID_TYPE> built_segs;
std::set<ID_TYPE> build_stale_segs;
std::mutex all_mtx;
std::mutex building_mtx;
std::mutex merge_mtx;
std::mutex built_mtx;
std::mutex partition_mtx;
WaitableObj merge_waiter;
WaitableObj build_waiter;
......@@ -691,6 +706,8 @@ TEST_F(SnapshotTest, CompoundTest1) {
sf_context.segment_id = 1;
sf_context.partition_id = 1;
IDS_TYPE partitions = {ss->GetResources<Partition>().begin()->second->GetID()};
auto do_build = [&] (const ID_TYPE& seg_id) {
decltype(ss) latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
......@@ -704,11 +721,26 @@ TEST_F(SnapshotTest, CompoundTest1) {
SegmentFilePtr seg_file;
build_sf_context.segment_id = seg_id;
status = build_op->CommitNewSegmentFile(build_sf_context, seg_file);
ASSERT_TRUE(status.ok());
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
std::unique_lock<std::mutex> lock(merge_mtx);
auto it = merged_segs.find(seg_id);
ASSERT_NE(it, merged_segs.end());
return;
}
std::unique_lock<std::mutex> lock(built_mtx);
status = build_op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
std::unique_lock<std::mutex> lock(merge_mtx);
auto it = merged_segs.find(seg_id);
ASSERT_NE(it, merged_segs.end());
return;
}
ASSERT_TRUE(status.ok());
status = build_op->GetSnapshot(latest_ss);
ASSERT_TRUE(status.ok());
built_segs.insert(seg_id);
};
......@@ -720,6 +752,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
PartitionPtr partition;
OperationContext context;
for (auto& id : seg_ids) {
auto seg = latest_ss->GetResource<Segment>(id);
......@@ -727,6 +760,12 @@ TEST_F(SnapshotTest, CompoundTest1) {
std::cout << "Error seg=" << id << std::endl;
ASSERT_TRUE(seg);
}
if (!partition) {
partition = latest_ss->GetResource<Partition>(seg->GetPartitionId());
ASSERT_TRUE(partition);
} else {
ASSERT_EQ(seg->GetPartitionId(), partition->GetID());
}
context.stale_segments.push_back(seg);
if (!context.prev_partition) {
context.prev_partition = latest_ss->GetResource<Partition>(
......@@ -743,29 +782,64 @@ TEST_F(SnapshotTest, CompoundTest1) {
SegmentFilePtr seg_file;
status = op->CommitNewSegmentFile(sf_context, seg_file);
ASSERT_TRUE(status.ok());
std::unique_lock<std::mutex> lock(merge_mtx);
status = op->Push();
ASSERT_TRUE(status.ok());
if (!status.ok()) {
lock.unlock();
std::unique_lock<std::mutex> blk(built_mtx);
std::cout << status.ToString() << std::endl;
/* for (auto id : built_segs) { */
/* std::cout << "builted " << id << std::endl; */
/* } */
/* for (auto id : seg_ids) { */
/* std::cout << "to_merge " << id << std::endl; */
/* } */
bool stale_found = false;
for (auto& seg_id : seg_ids) {
auto it = built_segs.find(seg_id);
if (it != built_segs.end()) {
stale_found = true;
break;
}
}
ASSERT_TRUE(stale_found);
return;
}
ID_TYPE ss_id = latest_ss->GetID();
status = op->GetSnapshot(latest_ss);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(latest_ss->GetID() > ss_id);
merged_segs[new_seg->GetID()] = seg_ids;
merged_segs_history[new_seg->GetID()] = seg_ids;
for (auto& seg_id : seg_ids) {
merged_segs.insert(seg_id);
}
new_seg_id = new_seg->GetID();
ASSERT_EQ(new_seg->GetPartitionId(), partition->GetID());
};
// TODO: If any Compound Operation find larger Snapshot. This Operation should be rollback to latest
auto handler_worker = [&] {
auto to_build_segments = RandomInt(50, 60);
auto loop_cnt = RandomInt(10, 20);
decltype(ss) latest_ss;
for (auto i = 0; i < to_build_segments; ++i) {
auto create_new_segment = [&]() {
ID_TYPE partition_id;
{
std::unique_lock<std::mutex> lock(partition_mtx);
auto idx = RandomInt(0, partitions.size() - 1);
partition_id = partitions[idx];
}
Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
OperationContext context;
context.prev_partition = latest_ss->GetResource<Partition>(partition_id);
context.lsn = next_lsn();
context.prev_partition = latest_ss->GetResource<Partition>(8);
auto op = std::make_shared<NewSegmentOperation>(context, latest_ss);
SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
}
ASSERT_TRUE(status.ok());
SegmentFilePtr seg_file;
sf_context.segment_id = new_seg->GetID();
......@@ -778,10 +852,39 @@ TEST_F(SnapshotTest, CompoundTest1) {
std::unique_lock<std::mutex> lock(all_mtx);
all_segments.insert(new_seg->GetID());
}
if (RandomInt(0, 10) >= 7) {
build_queue.Put(new_seg->GetID());
}
merge_queue.Put(new_seg->GetID());
};
auto create_partition = [&]() {
std::stringstream ss;
ss << "fake_partition_" << next_pid();
PartitionContext context;
context.name = ss.str();
std::unique_lock<std::mutex> lock(partition_mtx);
auto latest_ss = CreatePartition(collection_name, context, next_lsn());
ASSERT_TRUE(latest_ss);
auto partition = latest_ss->GetPartition(ss.str());
partitions.push_back(partition->GetID());
if (latest_ss->NumberOfPartitions() != partitions.size()) {
for (auto& pid : partitions) {
std::cout << "PartitionId=" << pid << std::endl;
}
}
ASSERT_EQ(latest_ss->NumberOfPartitions(), partitions.size());
};
for (auto i = 0; i < loop_cnt; ++i) {
if (RandomInt(0, 10) > 7) {
create_partition();
}
create_new_segment();
}
};
std::map<ID_TYPE, std::set<ID_TYPE>> merge_segs;
auto merge_worker = [&] {
while (true) {
auto seg_id = merge_queue.Take();
......@@ -789,18 +892,36 @@ TEST_F(SnapshotTest, CompoundTest1) {
std::cout << "Exiting Merge Worker" << std::endl;
break;
}
merge_segs.insert(seg_id);
if ((merge_segs.size() >= 2) && (RandomInt(0, 10) >= 5)) {
std::cout << "Merging (";
for (auto seg : merge_segs) {
decltype(ss) latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
auto seg = latest_ss->GetResource<Segment>(seg_id);
if (!seg) {
std::cout << "SegID=" << seg_id << std::endl;
std::cout << latest_ss->ToString() << std::endl;
ASSERT_TRUE(seg);
}
auto it_segs = merge_segs.find(seg->GetPartitionId());
if (it_segs == merge_segs.end()) {
merge_segs[seg->GetPartitionId()] = {seg->GetID()};
} else {
merge_segs[seg->GetPartitionId()].insert(seg->GetID());
}
auto& segs = merge_segs[seg->GetPartitionId()];
if ((segs.size() >= 2) && (RandomInt(0, 10) >= 2)) {
std::cout << "Merging partition " << seg->GetPartitionId() << " segs (";
for (auto seg : segs) {
std::cout << seg << ",";
}
std::cout << ")" << std::endl;
ID_TYPE new_seg_id = 0;
do_merge(merge_segs, new_seg_id);
merge_segs.clear();
ASSERT_NE(new_seg_id, 0);
if (RandomInt(0, 10) >= 5) {
do_merge(segs, new_seg_id);
segs.clear();
if (new_seg_id == 0) {
continue;
}
if (RandomInt(0, 10) >= 6) {
build_queue.Put(new_seg_id);
}
......@@ -842,7 +963,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
t3.join();
t4.join();
/* for (auto& kv : merged_segs) { */
/* for (auto& kv : merged_segs_history) { */
/* std::cout << "merged: ("; */
/* for (auto i : kv.second) { */
/* std::cout << i << ","; */
......@@ -860,7 +981,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
auto expect_segments = all_segments;
for (auto& kv : merged_segs) {
for (auto& kv : merged_segs_history) {
expect_segments.insert(kv.first);
for (auto& id : kv.second) {
expect_segments.erase(id);
......@@ -878,5 +999,10 @@ TEST_F(SnapshotTest, CompoundTest1) {
decltype(final_segment_file_cnt) expect_segment_file_cnt;
expect_segment_file_cnt = expect_segments.size();
expect_segment_file_cnt += built_segs.size();
std::cout << latest_ss->ToString() << std::endl;
std::vector<int> common_ids;
std::set_intersection(merged_segs.begin(), merged_segs.end(), built_segs.begin(), built_segs.end(),
std::back_inserter(common_ids));
expect_segment_file_cnt -= common_ids.size();
ASSERT_EQ(expect_segment_file_cnt, final_segment_file_cnt);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册