diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 5196383b09d752f16beeb9449051d08f4b68b055..29a986cae61c841bf35b1c7278c4937e0a76b019 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -529,7 +529,7 @@ ExecutionEngineImpl::BuildIndex() { // add segment files snapshot::OperationContext context; context.prev_partition = snapshot->GetResource(segment->GetPartitionId()); - auto build_op = std::make_shared(context, snapshot); + auto build_op = std::make_shared(context, snapshot); auto add_segment_file = [&](const std::string& element_name, snapshot::SegmentFilePtr& seg_file) -> Status { snapshot::SegmentFileContext sf_context; diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index 40ddbabf585fbe13a39cbf6c2ba57f2ee685dedb..bcf1c2f2966dccb4697b6728f647197f6c5a0622 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -26,15 +26,24 @@ namespace milvus { namespace engine { namespace snapshot { -AddSegmentFileOperation::AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss) +ChangeSegmentFileOperation::ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) { } Status -AddSegmentFileOperation::DoExecute(StorePtr store) { - STATUS_CHECK(CheckStale(std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, +ChangeSegmentFileOperation::DoExecute(StorePtr store) { + STATUS_CHECK(CheckStale(std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context_.new_segment_files[0]->GetSegmentId()))); + ID_TYPE segment_id = 0; + for (auto& stale_segment_file : context_.stale_segment_files) { + if (segment_id == 0) { + segment_id = stale_segment_file->GetSegmentId(); + } else if (segment_id != stale_segment_file->GetSegmentId()) { + return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment"); + } + } + auto update_size = [&](SegmentFilePtr& file) { auto update_ctx = ResourceContextBuilder().SetOp(meta::oUpdate).CreatePtr(); update_ctx->AddAttr(SizeField::Name); @@ -42,6 +51,11 @@ AddSegmentFileOperation::DoExecute(StorePtr store) { }; for (auto& new_file : context_.new_segment_files) { + if (segment_id == 0) { + segment_id = new_file->GetSegmentId(); + } else if (segment_id != new_file->GetSegmentId()) { + return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment"); + } update_size(new_file); } @@ -93,7 +107,7 @@ AddSegmentFileOperation::DoExecute(StorePtr store) { } Status -AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const { +ChangeSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const { auto segment = latest_snapshot->GetResource(segment_id); if (!segment) { std::stringstream emsg; @@ -104,16 +118,16 @@ AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_ } Status -AddSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) { +ChangeSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) { delta_ = delta; sub_ = sub; return Status::OK(); } Status -AddSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { +ChangeSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { STATUS_CHECK(CheckStale( - std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id))); + std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id))); auto segment = GetStartedSS()->GetResource(context.segment_id); if (!segment || (context_.new_segment_files.size() > 0 && @@ -270,7 +284,7 @@ DropAllIndexOperation::DoExecute(StorePtr store) { } auto context = context_; - context.stale_segment_file = kv.second.Get(); + context.stale_segment_files.push_back(kv.second.Get()); SegmentCommitOperation sc_op(context, GetAdjustedSS()); STATUS_CHECK(sc_op(store)); STATUS_CHECK(sc_op.GetResource(context.new_segment_commit)); @@ -306,7 +320,7 @@ DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSn Status DropIndexOperation::PreCheck() { - if (context_.stale_segment_file == nullptr) { + if (context_.stale_segment_files.size() == 0) { std::stringstream emsg; emsg << GetRepr() << ". Stale segment is requried"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index bbdff4314b58facb94a3cede1a56659ae4c0e764..b4e563d8dd88346cc2f24e1de7340bc1a96d6d03 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -58,12 +58,12 @@ class CompoundBaseOperation : public Operations { } }; -class AddSegmentFileOperation : public CompoundBaseOperation { +class ChangeSegmentFileOperation : public CompoundBaseOperation { public: - using BaseT = CompoundBaseOperation; + using BaseT = CompoundBaseOperation; static constexpr const char* Name = "B"; - AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss); + ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss); Status DoExecute(StorePtr) override; diff --git a/core/src/db/snapshot/Context.cpp b/core/src/db/snapshot/Context.cpp index 495896ce1168ee5e71cb3affe5ced115f197251e..484029dbd4adb663285542b010b4e58158d0b806 100644 --- a/core/src/db/snapshot/Context.cpp +++ b/core/src/db/snapshot/Context.cpp @@ -68,9 +68,20 @@ OperationContext::ToString() const { } ss << "]"; } - if (stale_segment_file) { - ss << ",S_SF=" << stale_segment_file->GetID(); + + if (stale_segment_files.size() > 0) { + ss << ",S_SF=["; + bool first = true; + for (auto& f : new_segment_files) { + if (!first) { + ss << INNER_DELIMITER; + } + ss << f->GetID(); + first = false; + } + ss << "]"; } + if (new_segment_files.size() > 0) { ss << ",N_SF=["; bool first = true; diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index c43f8248921686f2b8f234f166c99c6e00f947b8..2411859460944f6bd89ff6f7af4bbe87ae3a68fb 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -63,7 +63,6 @@ struct OperationContext { CollectionCommitPtr new_collection_commit = nullptr; CollectionPtr new_collection = nullptr; - SegmentFilePtr stale_segment_file = nullptr; std::vector stale_segments; FieldPtr prev_field = nullptr; @@ -83,6 +82,7 @@ struct OperationContext { PartitionCommitPtr stale_partition_commit = nullptr; SegmentFile::VecT new_segment_files; + SegmentFile::VecT stale_segment_files; CollectionPtr collection = nullptr; LSN_TYPE lsn = 0; diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index 5e86f755a6a2450b02bb9c870e8b940746eef2e4..eedd902d09baf539f7bf560af2056b3580153d55 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -205,8 +205,8 @@ SegmentCommit::Ptr SegmentCommitOperation::GetPrevResource() const { if (context_.new_segment_files.size() > 0) { return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId()); - } else if (context_.stale_segment_file != nullptr) { - return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_file->GetSegmentId()); + } else if (context_.stale_segment_files.size() > 0) { + return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_files[0]->GetSegmentId()); } return nullptr; } @@ -220,9 +220,9 @@ SegmentCommitOperation::DoExecute(StorePtr store) { resource_->SetID(0); resource_->ResetStatus(); size = resource_->GetSize(); - if (context_.stale_segment_file) { - resource_->GetMappings().erase(context_.stale_segment_file->GetID()); - size -= context_.stale_segment_file->GetSize(); + for (auto& stale_segment_file : context_.stale_segment_files) { + resource_->GetMappings().erase(stale_segment_file->GetID()); + size -= stale_segment_file->GetSize(); } } else { resource_ = std::make_shared(GetStartedSS()->GetLatestSchemaCommitId(), @@ -240,14 +240,10 @@ SegmentCommitOperation::DoExecute(StorePtr store) { Status SegmentCommitOperation::PreCheck() { - if (context_.stale_segment_file == nullptr && context_.new_segment_files.size() == 0) { + if (context_.stale_segment_files.size() == 0 && context_.new_segment_files.size() == 0) { std::stringstream emsg; emsg << GetRepr() << ". new_segment_files should not be empty in context"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); - } else if (context_.stale_segment_file != nullptr && context_.new_segment_files.size() > 0) { - std::stringstream emsg; - emsg << GetRepr() << ". new_segment_files should be empty in context"; - return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } return Status::OK(); } diff --git a/core/unittest/db/test_snapshot.cpp b/core/unittest/db/test_snapshot.cpp index 422874a6528290063981eff6d9617e30d2cd1619..0b0bb7c343f3021edac1da5881d3e4467eff45fc 100644 --- a/core/unittest/db/test_snapshot.cpp +++ b/core/unittest/db/test_snapshot.cpp @@ -479,7 +479,7 @@ TEST_F(SnapshotTest, IndexTest) { OperationContext context; context.lsn = next_lsn(); context.prev_partition = ss->GetResource(sf_context.partition_id); - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; status = build_op->CommitNewSegmentFile(sf_context, seg_file); ASSERT_TRUE(status.ok()); @@ -510,7 +510,7 @@ TEST_F(SnapshotTest, IndexTest) { OperationContext drop_ctx; drop_ctx.lsn = next_lsn(); - drop_ctx.stale_segment_file = seg_file; + drop_ctx.stale_segment_files.push_back(seg_file); auto drop_op = std::make_shared(drop_ctx, ss); status = drop_op->Push(); ASSERT_TRUE(status.ok()); @@ -673,7 +673,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; status = build_op->CommitNewSegmentFile(sf_context, seg_file); std::cout << status.ToString() << std::endl; @@ -841,7 +841,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, new_ss); + auto build_op = std::make_shared(context, new_ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = new_seg_id; @@ -852,7 +852,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto op = std::make_shared(context, ss); + auto op = std::make_shared(context, ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = merge_seg->GetID(); @@ -918,7 +918,7 @@ TEST_F(SnapshotTest, OperationTest) { { OperationContext context; context.lsn = ++lsn; - auto build_op = std::make_shared(context, ss); + auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; auto new_sf_context = sf_context; new_sf_context.segment_id = merge_seg->GetID(); @@ -938,6 +938,77 @@ TEST_F(SnapshotTest, OperationTest) { Snapshots::GetInstance().Reset(); } +TEST_F(SnapshotTest, ChangeSegmentFileOperationTest) { + LSN_TYPE lsn = 0; + std::string collection_name("c1"); + auto ss = CreateCollection(collection_name, ++lsn); + ASSERT_TRUE(ss); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + auto& partitions = ss->GetResources(); + auto total_row_cnt = 0; + for (auto& kv : partitions) { + auto num = RandomInt(2, 5); + for (auto i = 0; i < num; ++i) { + auto row_cnt = RandomInt(100, 200); + ASSERT_TRUE(CreateSegment(ss, kv.first, ++lsn, sf_context, row_cnt).ok()); + total_row_cnt += row_cnt; + } + } + + auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(total_row_cnt, ss->GetCollectionCommit()->GetRowCount()); + + auto total_size = ss->GetCollectionCommit()->GetSize(); + + auto target_segment = ss->GetResources().begin()->second.Get(); + + auto sf_ids = ss->GetSegmentFileIds(target_segment->GetID()); + ASSERT_GT(sf_ids.size(), 0); + auto stale_sf = ss->GetResource(*(sf_ids.begin())); + ASSERT_TRUE(stale_sf); + + std::cout << stale_sf->GetSize() << std::endl; + + OperationContext context; + context.lsn = ++lsn; + context.stale_segment_files.push_back(stale_sf); + auto op = std::make_shared(context, ss); + SegmentFilePtr seg_file; + sf_context.field_name = "vector"; + sf_context.field_element_name = "_raw"; + sf_context.segment_id = stale_sf->GetSegmentId(); + sf_context.partition_id = stale_sf->GetPartitionId(); + sf_context.collection_id = stale_sf->GetCollectionId(); + status = op->CommitNewSegmentFile(sf_context, seg_file); + /* std::cout << status.ToString() << std::endl; */ + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(seg_file); + + auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId()); + auto prev_segment_commit_mappings = prev_segment_commit->GetMappings(); + ASSERT_FALSE(prev_segment_commit->ToString().empty()); + + auto new_size = RandomInt(1000, 20000); + seg_file->SetSize(new_size); + total_size += new_size; + total_size -= stale_sf->GetSize(); + + auto delta = prev_segment_commit->GetRowCount() / 2; + op->CommitRowCountDelta(delta); + total_row_cnt -= delta; + + status = op->Push(); + ASSERT_TRUE(status.ok()) << status.message(); + status = op->GetSnapshot(ss); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(ss->GetCollectionCommit()->GetRowCount(), total_row_cnt); + ASSERT_EQ(ss->GetCollectionCommit()->GetSize(), total_size); +} + TEST_F(SnapshotTest, CompoundTest1) { Status status; std::atomic lsn = 0; @@ -988,7 +1059,7 @@ TEST_F(SnapshotTest, CompoundTest1) { OperationContext context; context.lsn = next_lsn(); - auto build_op = std::make_shared(context, latest_ss); + auto build_op = std::make_shared(context, latest_ss); SegmentFilePtr seg_file; build_sf_context.segment_id = seg_id; status = build_op->CommitNewSegmentFile(build_sf_context, seg_file); @@ -1332,7 +1403,7 @@ TEST_F(SnapshotTest, CompoundTest2) { OperationContext context; context.lsn = next_lsn(); - auto build_op = std::make_shared(context, latest_ss); + auto build_op = std::make_shared(context, latest_ss); SegmentFilePtr seg_file; build_sf_context.segment_id = seg_id; status = build_op->CommitNewSegmentFile(build_sf_context, seg_file); diff --git a/core/unittest/db/utils.h b/core/unittest/db/utils.h index 712327642b07d3a4d289b992de2138af03b6a160..bb3b0a4df0ee4ffc138160f8156a185279eb0c8b 100644 --- a/core/unittest/db/utils.h +++ b/core/unittest/db/utils.h @@ -46,7 +46,7 @@ using PartitionContext = milvus::engine::snapshot::PartitionContext; using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation; using AddFieldElementOperation = milvus::engine::snapshot::AddFieldElementOperation; using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation; -using AddSegmentFileOperation = milvus::engine::snapshot::AddSegmentFileOperation; +using ChangeSegmentFileOperation = milvus::engine::snapshot::ChangeSegmentFileOperation; using MergeOperation = milvus::engine::snapshot::MergeOperation; using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;