From 442e6aee9afcb8cc82b7f0239ef3f4e5920cfcd2 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 28 Jul 2020 19:34:23 +0800 Subject: [PATCH] (snapshot/db): add ChangeSegmentFileOperation and related test (#3050) * (db/snapshot): update for row count Signed-off-by: peng.xu * (db/snapshot): fix bug in NewSegmentOperation Signed-off-by: peng.xu * (db/snapshot): remove dummy print Signed-off-by: peng.xu * (db/snapshot): Add some test for row count Signed-off-by: peng.xu * (db/snapshot): update size logic Signed-off-by: peng.xu * (db/snapshot): update size logic related ut Signed-off-by: peng.xu * (db/snapshot): rollback if operation is not done Signed-off-by: peng.xu * (db/snapshot): clean store Signed-off-by: peng.xu * (db/snapshot): remove some dependency Signed-off-by: peng.xu * (db/snapshot): update for store Signed-off-by: peng.xu * (db/snapshot): update Store.h Signed-off-by: peng.xu * (db/snapshot): update store related code Signed-off-by: peng.xu * (db/snapshot): add field element modification operation Signed-off-by: peng.xu * (db/snapshot): change new operation name Signed-off-by: peng.xu * (db/snapshot): fix lint error Signed-off-by: peng.xu * (db/snapshot): Add Segment File Operation Signed-off-by: peng.xu * (db/snapshot): crtp for BaseResource Signed-off-by: peng.xu * (db/snapshot): add InActiveResourcesGCEvent Signed-off-by: peng.xu * (db/snapshot): fix ut error Signed-off-by: peng.xu * (db/snapshot): small change Signed-off-by: peng.xu * (db/snapshot): update snapshot segmentcommit operation Signed-off-by: peng.xu * (db/snapshot): update drop all index operation Signed-off-by: peng.xu * (db/snapshot): update ut Signed-off-by: peng.xu * (db/snapshot): fix lint error Signed-off-by: peng.xu * (db/snapshot): fix gc Signed-off-by: peng.xu * (db/snapshot): fix gc segment files Signed-off-by: peng.xu * (db/snapshot): refactor Event related Signed-off-by: peng.xu * (db/snapshot): change 1 Signed-off-by: peng.xu * (db/snapshot): change for GC event Signed-off-by: peng.xu * (db/snapshot): fix build error for high version of boost filesystem Signed-off-by: peng.xu * (db/snapshot): small change Signed-off-by: peng.xu * (db/snapshot): update compound operations Signed-off-by: peng.xu * (db/snapshot): add operation test Signed-off-by: peng.xu --- core/src/db/engine/ExecutionEngineImpl.cpp | 2 +- core/src/db/snapshot/CompoundOperations.cpp | 32 +++++--- core/src/db/snapshot/CompoundOperations.h | 6 +- core/src/db/snapshot/Context.cpp | 15 +++- core/src/db/snapshot/Context.h | 2 +- core/src/db/snapshot/ResourceOperations.cpp | 16 ++-- core/unittest/db/test_snapshot.cpp | 87 +++++++++++++++++++-- core/unittest/db/utils.h | 2 +- 8 files changed, 127 insertions(+), 35 deletions(-) diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 5196383b..29a986ca 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -529,7 +529,7 @@ ExecutionEngineImpl::BuildIndex() { // add segment files snapshot::OperationContext context; context.prev_partition = snapshot->GetResource(segment->GetPartitionId()); - auto build_op = std::make_shared(context, snapshot); + auto build_op = std::make_shared(context, snapshot); auto add_segment_file = [&](const std::string& element_name, snapshot::SegmentFilePtr& seg_file) -> Status { snapshot::SegmentFileContext sf_context; diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index 40ddbabf..bcf1c2f2 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -26,15 +26,24 @@ namespace milvus { namespace engine { namespace snapshot { -AddSegmentFileOperation::AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss) +ChangeSegmentFileOperation::ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) { } Status -AddSegmentFileOperation::DoExecute(StorePtr store) { - STATUS_CHECK(CheckStale(std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, +ChangeSegmentFileOperation::DoExecute(StorePtr store) { + STATUS_CHECK(CheckStale(std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context_.new_segment_files[0]->GetSegmentId()))); + ID_TYPE segment_id = 0; + for (auto& stale_segment_file : context_.stale_segment_files) { + if (segment_id == 0) { + segment_id = stale_segment_file->GetSegmentId(); + } else if (segment_id != stale_segment_file->GetSegmentId()) { + return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment"); + } + } + auto update_size = [&](SegmentFilePtr& file) { auto update_ctx = ResourceContextBuilder().SetOp(meta::oUpdate).CreatePtr(); update_ctx->AddAttr(SizeField::Name); @@ -42,6 +51,11 @@ AddSegmentFileOperation::DoExecute(StorePtr store) { }; for (auto& new_file : context_.new_segment_files) { + if (segment_id == 0) { + segment_id = new_file->GetSegmentId(); + } else if (segment_id != new_file->GetSegmentId()) { + return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment"); + } update_size(new_file); } @@ -93,7 +107,7 @@ AddSegmentFileOperation::DoExecute(StorePtr store) { } Status -AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const { +ChangeSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const { auto segment = latest_snapshot->GetResource(segment_id); if (!segment) { std::stringstream emsg; @@ -104,16 +118,16 @@ AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_ } Status -AddSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) { +ChangeSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) { delta_ = delta; sub_ = sub; return Status::OK(); } Status -AddSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { +ChangeSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { STATUS_CHECK(CheckStale( - std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id))); + std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id))); auto segment = GetStartedSS()->GetResource(context.segment_id); if (!segment || (context_.new_segment_files.size() > 0 && @@ -270,7 +284,7 @@ DropAllIndexOperation::DoExecute(StorePtr store) { } auto context = context_; - context.stale_segment_file = kv.second.Get(); + context.stale_segment_files.push_back(kv.second.Get()); SegmentCommitOperation sc_op(context, GetAdjustedSS()); STATUS_CHECK(sc_op(store)); STATUS_CHECK(sc_op.GetResource(context.new_segment_commit)); @@ -306,7 +320,7 @@ DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSn Status DropIndexOperation::PreCheck() { - if (context_.stale_segment_file == nullptr) { + if (context_.stale_segment_files.size() == 0) { std::stringstream emsg; emsg << GetRepr() << ". Stale segment is requried"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index bbdff431..b4e563d8 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -58,12 +58,12 @@ class CompoundBaseOperation : public Operations { } }; -class AddSegmentFileOperation : public CompoundBaseOperation { +class ChangeSegmentFileOperation : public CompoundBaseOperation { public: - using BaseT = CompoundBaseOperation; + using BaseT = CompoundBaseOperation; static constexpr const char* Name = "B"; - AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss); + ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss); Status DoExecute(StorePtr) override; diff --git a/core/src/db/snapshot/Context.cpp b/core/src/db/snapshot/Context.cpp index 495896ce..484029db 100644 --- a/core/src/db/snapshot/Context.cpp +++ b/core/src/db/snapshot/Context.cpp @@ -68,9 +68,20 @@ OperationContext::ToString() const { } ss << "]"; } - if (stale_segment_file) { - ss << ",S_SF=" << stale_segment_file->GetID(); + + if (stale_segment_files.size() > 0) { + ss << ",S_SF=["; + bool first = true; + for (auto& f : new_segment_files) { + if (!first) { + ss << INNER_DELIMITER; + } + ss << f->GetID(); + first = false; + } + ss << "]"; } + if (new_segment_files.size() > 0) { ss << ",N_SF=["; bool first = true; diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index c43f8248..24118594 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -63,7 +63,6 @@ struct OperationContext { CollectionCommitPtr new_collection_commit = nullptr; CollectionPtr new_collection = nullptr; - SegmentFilePtr stale_segment_file = nullptr; std::vector stale_segments; FieldPtr prev_field = nullptr; @@ -83,6 +82,7 @@ struct OperationContext { PartitionCommitPtr stale_partition_commit = nullptr; SegmentFile::VecT new_segment_files; + SegmentFile::VecT stale_segment_files; CollectionPtr collection = nullptr; LSN_TYPE lsn = 0; diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index 5e86f755..eedd902d 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -205,8 +205,8 @@ SegmentCommit::Ptr SegmentCommitOperation::GetPrevResource() const { if (context_.new_segment_files.size() > 0) { return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId()); - } else if (context_.stale_segment_file != nullptr) { - return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_file->GetSegmentId()); + } else if (context_.stale_segment_files.size() > 0) { + return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_files[0]->GetSegmentId()); } return nullptr; } @@ -220,9 +220,9 @@ SegmentCommitOperation::DoExecute(StorePtr store) { resource_->SetID(0); resource_->ResetStatus(); size = resource_->GetSize(); - if (context_.stale_segment_file) { - resource_->GetMappings().erase(context_.stale_segment_file->GetID()); - size -= context_.stale_segment_file->GetSize(); + for (auto& stale_segment_file : context_.stale_segment_files) { + resource_->GetMappings().erase(stale_segment_file->GetID()); + size -= stale_segment_file->GetSize(); } } else { resource_ = std::make_shared(GetStartedSS()->GetLatestSchemaCommitId(), @@ -240,14 +240,10 @@ SegmentCommitOperation::DoExecute(StorePtr store) { Status SegmentCommitOperation::PreCheck() { - if (context_.stale_segment_file == nullptr && context_.new_segment_files.size() == 0) { + if (context_.stale_segment_files.size() == 0 && context_.new_segment_files.size() == 0) { std::stringstream emsg; emsg << GetRepr() << ". new_segment_files should not be empty in context"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); - } else if (context_.stale_segment_file != nullptr && context_.new_segment_files.size() > 0) { - std::stringstream emsg; - emsg << GetRepr() << ". new_segment_files should be empty in context"; - return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } return Status::OK(); } diff --git a/core/unittest/db/test_snapshot.cpp b/core/unittest/db/test_snapshot.cpp index 422874a6..0b0bb7c3 100644 --- a/core/unittest/db/test_snapshot.cpp +++ b/core/unittest/db/test_snapshot.cpp @@ -479,7 +479,7 @@ TEST_F(SnapshotTest, IndexTest) { OperationContext context; context.lsn = next_lsn(); context.prev_partition = ss->GetResource(sf_context.partition_id); - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; status = build_op->CommitNewSegmentFile(sf_context, seg_file); ASSERT_TRUE(status.ok()); @@ -510,7 +510,7 @@ TEST_F(SnapshotTest, IndexTest) { OperationContext drop_ctx; drop_ctx.lsn = next_lsn(); - drop_ctx.stale_segment_file = seg_file; + drop_ctx.stale_segment_files.push_back(seg_file); auto drop_op = std::make_shared(drop_ctx, ss); status = drop_op->Push(); ASSERT_TRUE(status.ok()); @@ -673,7 +673,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; status = build_op->CommitNewSegmentFile(sf_context, seg_file); std::cout << status.ToString() << std::endl; @@ -841,7 +841,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, new_ss); + auto build_op = std::make_shared(context, new_ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = new_seg_id; @@ -852,7 +852,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto op = std::make_shared(context, ss); + auto op = std::make_shared(context, ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = merge_seg->GetID(); @@ -918,7 +918,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = merge_seg->GetID(); @@ -938,6 +938,77 @@ TEST_F(SnapshotTest, OperationTest) { Snapshots::GetInstance().Reset(); } +TEST_F(SnapshotTest, ChangeSegmentFileOperationTest) { + LSN_TYPE lsn = 0; + std::string collection_name("c1"); + auto ss = CreateCollection(collection_name, ++lsn); + ASSERT_TRUE(ss); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + auto& partitions = ss->GetResources(); + auto total_row_cnt = 0; + for (auto& kv : partitions) { + auto num = RandomInt(2, 5); + for (auto i = 0; i < num; ++i) { + auto row_cnt = RandomInt(100, 200); + ASSERT_TRUE(CreateSegment(ss, kv.first, ++lsn, sf_context, row_cnt).ok()); + total_row_cnt += row_cnt; + } + } + + auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(total_row_cnt, ss->GetCollectionCommit()->GetRowCount()); + + auto total_size = ss->GetCollectionCommit()->GetSize(); + + auto target_segment = ss->GetResources().begin()->second.Get(); + + auto sf_ids = ss->GetSegmentFileIds(target_segment->GetID()); + ASSERT_GT(sf_ids.size(), 0); + auto stale_sf = ss->GetResource(*(sf_ids.begin())); + ASSERT_TRUE(stale_sf); + + std::cout << stale_sf->GetSize() << std::endl; + + OperationContext context; + context.lsn = ++lsn; + context.stale_segment_files.push_back(stale_sf); + auto op = std::make_shared(context, ss); + SegmentFilePtr seg_file; + sf_context.field_name = "vector"; + sf_context.field_element_name = "_raw"; + sf_context.segment_id = stale_sf->GetSegmentId(); + sf_context.partition_id = stale_sf->GetPartitionId(); + sf_context.collection_id = stale_sf->GetCollectionId(); + status = op->CommitNewSegmentFile(sf_context, seg_file); + /* std::cout << status.ToString() << std::endl; */ + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(seg_file); + + auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId()); + auto prev_segment_commit_mappings = prev_segment_commit->GetMappings(); + ASSERT_FALSE(prev_segment_commit->ToString().empty()); + + auto new_size = RandomInt(1000, 20000); + seg_file->SetSize(new_size); + total_size += new_size; + total_size -= stale_sf->GetSize(); + + auto delta = prev_segment_commit->GetRowCount() / 2; + op->CommitRowCountDelta(delta); + total_row_cnt -= delta; + + status = op->Push(); + ASSERT_TRUE(status.ok()) << status.message(); + status = op->GetSnapshot(ss); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(ss->GetCollectionCommit()->GetRowCount(), total_row_cnt); + ASSERT_EQ(ss->GetCollectionCommit()->GetSize(), total_size); +} + TEST_F(SnapshotTest, CompoundTest1) { Status status; std::atomic lsn = 0; @@ -988,7 +1059,7 @@ TEST_F(SnapshotTest, CompoundTest1) { OperationContext context; context.lsn = next_lsn(); - auto build_op = std::make_shared(context, latest_ss); + auto build_op = std::make_shared(context, latest_ss); SegmentFilePtr seg_file; build_sf_context.segment_id = seg_id; status = build_op->CommitNewSegmentFile(build_sf_context, seg_file); @@ -1332,7 +1403,7 @@ TEST_F(SnapshotTest, CompoundTest2) { OperationContext context; context.lsn = next_lsn(); - auto build_op = std::make_shared(context, latest_ss); + auto build_op = std::make_shared(context, latest_ss); SegmentFilePtr seg_file; build_sf_context.segment_id = seg_id; status = build_op->CommitNewSegmentFile(build_sf_context, seg_file); diff --git a/core/unittest/db/utils.h b/core/unittest/db/utils.h index 71232764..bb3b0a4d 100644 --- a/core/unittest/db/utils.h +++ b/core/unittest/db/utils.h @@ -46,7 +46,7 @@ using PartitionContext = milvus::engine::snapshot::PartitionContext; using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation; using AddFieldElementOperation = milvus::engine::snapshot::AddFieldElementOperation; using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation; -using AddSegmentFileOperation = milvus::engine::snapshot::AddSegmentFileOperation; +using ChangeSegmentFileOperation = milvus::engine::snapshot::ChangeSegmentFileOperation; using MergeOperation = milvus::engine::snapshot::MergeOperation; using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation; -- GitLab