From ba26b677da1e9a8e3a71c63b424669fdd1ac9b96 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 24 Jun 2020 20:39:57 +0800 Subject: [PATCH] snapshot gc (#2668) * rename StatusField to StateField Signed-off-by: yudong.cai * fix typo Signed-off-by: yudong.cai * enable GC in OnNoRefCallBack Signed-off-by: yudong.cai * code opt Signed-off-by: yudong.cai --- core/src/db/snapshot/CompoundOperations.cpp | 251 +++++++------------- core/src/db/snapshot/Context.h | 2 +- core/src/db/snapshot/Event.h | 6 +- core/src/db/snapshot/Operations.cpp | 79 +++--- core/src/db/snapshot/Operations.h | 40 ++-- core/src/db/snapshot/ResourceHolder.h | 5 +- core/src/db/snapshot/ResourceOperations.h | 6 +- core/src/db/snapshot/Resources.cpp | 50 ++-- core/src/db/snapshot/Resources.h | 42 ++-- 9 files changed, 180 insertions(+), 301 deletions(-) diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index c584b3681..07e5220b4 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -14,6 +14,7 @@ #include #include "db/snapshot/OperationExecutor.h" #include "db/snapshot/Snapshots.h" +#include "utils/Status.h" namespace milvus { namespace engine { @@ -24,42 +25,30 @@ BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT Status BuildOperation::DoExecute(Store& store) { - auto status = CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, - context_.new_segment_files[0]->GetSegmentId())); - if (!status.ok()) - return status; + STATUS_CHECK(CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, + context_.new_segment_files[0]->GetSegmentId()))); - SegmentCommitOperation op(context_, GetAdjustedSS()); - op(store); - status = op.GetResource(context_.new_segment_commit); - if (!status.ok()) - return status; + SegmentCommitOperation sc_op(context_, GetAdjustedSS()); + STATUS_CHECK(sc_op(store)); + STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit)); AddStepWithLsn(*context_.new_segment_commit, context_.lsn); PartitionCommitOperation pc_op(context_, GetAdjustedSS()); - pc_op(store); - + STATUS_CHECK(pc_op(store)); OperationContext cc_context; - status = pc_op.GetResource(cc_context.new_partition_commit); - - if (!status.ok()) - return status; + STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit)); AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn); - context_.new_partition_commit = cc_context.new_partition_commit; - status = pc_op.GetResource(context_.new_partition_commit); - if (!status.ok()) - return status; + context_.new_partition_commit = cc_context.new_partition_commit; + STATUS_CHECK(pc_op.GetResource(context_.new_partition_commit)); AddStepWithLsn(*context_.new_partition_commit, context_.lsn); CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); - cc_op(store); - status = cc_op.GetResource(context_.new_collection_commit); - if (!status.ok()) - return status; + STATUS_CHECK(cc_op(store)); + STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit)); AddStepWithLsn(*context_.new_collection_commit, context_.lsn); - return status; + return Status::OK(); } Status @@ -75,10 +64,9 @@ BuildOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segm Status BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { - auto status = - CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)); - if (!status.ok()) - return status; + STATUS_CHECK( + CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id))); + auto segment = GetStartedSS()->GetResource(context.segment_id); if (!segment) { std::stringstream emsg; @@ -88,15 +76,12 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF auto ctx = context; ctx.partition_id = segment->GetPartitionId(); auto new_sf_op = std::make_shared(ctx, GetStartedSS()); - status = new_sf_op->Push(); - if (!status.ok()) - return status; - status = new_sf_op->GetResource(created); - if (!status.ok()) - return status; + STATUS_CHECK(new_sf_op->Push()); + STATUS_CHECK(new_sf_op->GetResource(created)); context_.new_segment_files.push_back(created); AddStepWithLsn(*created, context_.lsn); - return status; + + return Status::OK(); } NewSegmentOperation::NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss) @@ -111,13 +96,9 @@ NewSegmentOperation::DoExecute(Store& store) { /* auto status = PrevSnapshotRequried(); */ /* if (!status.ok()) return status; */ // TODO: Check Context - SegmentCommitOperation op(context_, GetAdjustedSS()); - auto status = op(store); - if (!status.ok()) - return status; - status = op.GetResource(context_.new_segment_commit); - if (!status.ok()) - return status; + SegmentCommitOperation sc_op(context_, GetAdjustedSS()); + STATUS_CHECK(sc_op(store)); + STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit)); AddStepWithLsn(*context_.new_segment_commit, context_.lsn); /* std::cout << GetRepr() << " POST_SC_MAP=("; */ /* for (auto id : context_.new_segment_commit->GetMappings()) { */ @@ -126,14 +107,9 @@ NewSegmentOperation::DoExecute(Store& store) { /* std::cout << ")" << std::endl; */ OperationContext cc_context; - PartitionCommitOperation pc_op(context_, GetAdjustedSS()); - status = pc_op(store); - if (!status.ok()) - return status; - status = pc_op.GetResource(cc_context.new_partition_commit); - if (!status.ok()) - return status; + STATUS_CHECK(pc_op(store)); + STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit)); AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn); context_.new_partition_commit = cc_context.new_partition_commit; /* std::cout << GetRepr() << " POST_PC_MAP=("; */ @@ -143,46 +119,34 @@ NewSegmentOperation::DoExecute(Store& store) { /* std::cout << ")" << std::endl; */ CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); - status = cc_op(store); - if (!status.ok()) - return status; - status = cc_op.GetResource(context_.new_collection_commit); - if (!status.ok()) - return status; + STATUS_CHECK(cc_op(store)); + STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit)); AddStepWithLsn(*context_.new_collection_commit, context_.lsn); - return status; + return Status::OK(); } Status NewSegmentOperation::CommitNewSegment(SegmentPtr& created) { auto op = std::make_shared(context_, GetStartedSS()); - auto status = op->Push(); - if (!status.ok()) - return status; - status = op->GetResource(context_.new_segment); - if (!status.ok()) - return status; + STATUS_CHECK(op->Push()); + STATUS_CHECK(op->GetResource(context_.new_segment)); created = context_.new_segment; AddStepWithLsn(*created, context_.lsn); - return status; + return Status::OK(); } Status NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { - auto c = context; - c.segment_id = context_.new_segment->GetID(); - c.partition_id = context_.new_segment->GetPartitionId(); - auto new_sf_op = std::make_shared(c, GetStartedSS()); - auto status = new_sf_op->Push(); - if (!status.ok()) - return status; - status = new_sf_op->GetResource(created); - if (!status.ok()) - return status; + auto ctx = context; + ctx.segment_id = context_.new_segment->GetID(); + ctx.partition_id = context_.new_segment->GetPartitionId(); + auto new_sf_op = std::make_shared(ctx, GetStartedSS()); + STATUS_CHECK(new_sf_op->Push()); + STATUS_CHECK(new_sf_op->GetResource(created)); AddStepWithLsn(*created, context_.lsn); context_.new_segment_files.push_back(created); - return status; + return Status::OK(); } MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) { @@ -204,58 +168,42 @@ MergeOperation::OnSnapshotStale() { Status MergeOperation::CommitNewSegment(SegmentPtr& created) { - Status status; if (context_.new_segment) { created = context_.new_segment; - return status; + return Status::OK(); } auto op = std::make_shared(context_, GetStartedSS()); - status = op->Push(); - if (!status.ok()) - return status; - status = op->GetResource(context_.new_segment); - if (!status.ok()) - return status; + STATUS_CHECK(op->Push()); + STATUS_CHECK(op->GetResource(context_.new_segment)); created = context_.new_segment; AddStepWithLsn(*created, context_.lsn); - return status; + return Status::OK(); } Status MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) { // PXU TODO: Check element type and segment file mapping rules SegmentPtr new_segment; - auto status = CommitNewSegment(new_segment); - if (!status.ok()) - return status; - auto c = context; - c.segment_id = new_segment->GetID(); - c.partition_id = new_segment->GetPartitionId(); - auto new_sf_op = std::make_shared(c, GetStartedSS()); - status = new_sf_op->Push(); - if (!status.ok()) - return status; - status = new_sf_op->GetResource(created); - if (!status.ok()) - return status; + STATUS_CHECK(CommitNewSegment(new_segment)); + auto ctx = context; + ctx.segment_id = new_segment->GetID(); + ctx.partition_id = new_segment->GetPartitionId(); + auto new_sf_op = std::make_shared(ctx, GetStartedSS()); + STATUS_CHECK(new_sf_op->Push()); + STATUS_CHECK(new_sf_op->GetResource(created)); context_.new_segment_files.push_back(created); AddStepWithLsn(*created, context_.lsn); - return status; + return Status::OK(); } Status MergeOperation::DoExecute(Store& store) { // PXU TODO: - // 1. Check all requried field elements have related segment files + // 1. Check all required field elements have related segment files // 2. Check Stale and others - SegmentCommitOperation op(context_, GetAdjustedSS()); - auto status = op(store); - if (!status.ok()) - return status; - - status = op.GetResource(context_.new_segment_commit); - if (!status.ok()) - return status; + SegmentCommitOperation sc_op(context_, GetAdjustedSS()); + STATUS_CHECK(sc_op(store)); + STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit)); AddStepWithLsn(*context_.new_segment_commit, context_.lsn); /* std::cout << GetRepr() << " POST_SC_MAP=("; */ /* for (auto id : context_.new_segment_commit->GetMappings()) { */ @@ -264,14 +212,9 @@ MergeOperation::DoExecute(Store& store) { /* std::cout << ")" << std::endl; */ PartitionCommitOperation pc_op(context_, GetAdjustedSS()); - status = pc_op(store); - if (!status.ok()) - return status; - + STATUS_CHECK(pc_op(store)); OperationContext cc_context; - status = pc_op.GetResource(cc_context.new_partition_commit); - if (!status.ok()) - return status; + STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit)); AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn); context_.new_partition_commit = cc_context.new_partition_commit; @@ -282,15 +225,11 @@ MergeOperation::DoExecute(Store& store) { /* std::cout << ")" << std::endl; */ CollectionCommitOperation cc_op(cc_context, GetAdjustedSS()); - status = cc_op(store); - if (!status.ok()) - return status; - status = cc_op.GetResource(context_.new_collection_commit); - if (!status.ok()) - return status; + STATUS_CHECK(cc_op(store)); + STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit)); AddStepWithLsn(*context_.new_collection_commit, context_.lsn); - return status; + return Status::OK(); } GetSnapshotIDsOperation::GetSnapshotIDsOperation(ID_TYPE collection_id, bool reversed) @@ -345,15 +284,12 @@ DropPartitionOperation::GetRepr() const { Status DropPartitionOperation::DoExecute(Store& store) { - Status status; PartitionPtr p; auto id = c_context_.id; if (id == 0) { - status = GetAdjustedSS()->GetPartitionId(c_context_.name, id); + STATUS_CHECK(GetAdjustedSS()->GetPartitionId(c_context_.name, id)); c_context_.id = id; } - if (!status.ok()) - return status; auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id); if (!p_c) { std::stringstream emsg; @@ -364,16 +300,11 @@ DropPartitionOperation::DoExecute(Store& store) { OperationContext op_ctx; op_ctx.stale_partition_commit = p_c; - auto op = CollectionCommitOperation(op_ctx, GetAdjustedSS()); - status = op(store); - if (!status.ok()) - return status; - status = op.GetResource(context_.new_collection_commit); - if (!status.ok()) - return status; - + auto cc_op = CollectionCommitOperation(op_ctx, GetAdjustedSS()); + STATUS_CHECK(cc_op(store)); + STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit)); AddStepWithLsn(*context_.new_collection_commit, c_context_.lsn); - return status; + return Status::OK(); } CreatePartitionOperation::CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss) @@ -382,39 +313,28 @@ CreatePartitionOperation::CreatePartitionOperation(const OperationContext& conte Status CreatePartitionOperation::PreCheck() { - Status status = BaseT::PreCheck(); - if (!status.ok()) { - return status; - } + STATUS_CHECK(BaseT::PreCheck()); if (!context_.new_partition) { std::stringstream emsg; emsg << GetRepr() << ". Partition is missing"; - status = Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } - return status; + return Status::OK(); } Status CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, PartitionPtr& partition) { - Status status; auto op = std::make_shared(context, GetStartedSS()); - status = op->Push(); - if (!status.ok()) - return status; - status = op->GetResource(partition); - if (!status.ok()) - return status; + STATUS_CHECK(op->Push()); + STATUS_CHECK(op->GetResource(partition)); context_.new_partition = partition; AddStepWithLsn(*partition, context_.lsn); - return status; + return Status::OK(); } Status CreatePartitionOperation::DoExecute(Store& store) { - Status status; - status = CheckStale(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckStale()); auto collection = GetAdjustedSS()->GetCollection(); auto partition = context_.new_partition; @@ -423,28 +343,21 @@ CreatePartitionOperation::DoExecute(Store& store) { OperationContext pc_context; pc_context.new_partition = partition; auto pc_op = PartitionCommitOperation(pc_context, GetAdjustedSS()); - status = pc_op(store); - if (!status.ok()) - return status; - status = pc_op.GetResource(pc); - if (!status.ok()) - return status; + STATUS_CHECK(pc_op(store)); + STATUS_CHECK(pc_op.GetResource(pc)); AddStepWithLsn(*pc, context_.lsn); + OperationContext cc_context; cc_context.new_partition_commit = pc; context_.new_partition_commit = pc; auto cc_op = CollectionCommitOperation(cc_context, GetAdjustedSS()); - status = cc_op(store); - if (!status.ok()) - return status; + STATUS_CHECK(cc_op(store)); CollectionCommitPtr cc; - status = cc_op.GetResource(cc); - if (!status.ok()) - return status; + STATUS_CHECK(cc_op.GetResource(cc)); AddStepWithLsn(*cc, context_.lsn); context_.new_collection_commit = cc; - return status; + return Status::OK(); } CreateCollectionOperation::CreateCollectionOperation(const CreateCollectionContext& context) @@ -536,12 +449,8 @@ CreateCollectionOperation::DoExecute(Store& store) { Status CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const { - auto status = DoneRequired(); - if (!status.ok()) - return status; - status = IDSNotEmptyRequried(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckDone()); + STATUS_CHECK(CheckIDSNotEmpty()); if (!c_context_.collection_commit) { std::stringstream emsg; emsg << GetRepr() << ". No snapshot is available"; @@ -549,7 +458,7 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const { } /* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */ ss = context_.latest_ss; - return status; + return Status::OK(); } Status diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index 26428971f..e044e3a90 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -42,7 +42,7 @@ struct SegmentFileContext { struct LoadOperationContext { ID_TYPE id = 0; - State status = INVALID; + State state = INVALID; std::string name; }; diff --git a/core/src/db/snapshot/Event.h b/core/src/db/snapshot/Event.h index 73fb92242..b7396f75d 100644 --- a/core/src/db/snapshot/Event.h +++ b/core/src/db/snapshot/Event.h @@ -42,9 +42,11 @@ class ResourceGCEvent : public Event { Status Process() override { + auto& store = Store::GetInstance(); + /* mark resource as 'deleted' in meta */ auto sd_op = std::make_shared>(res_->GetID()); - STATUS_CHECK(sd_op->Push()); + STATUS_CHECK((*sd_op)(store)); /* TODO: physically clean resource */ std::vector res_file_list; @@ -62,7 +64,7 @@ class ResourceGCEvent : public Event { /* remove resource from meta */ auto hd_op = std::make_shared>(res_->GetID()); - STATUS_CHECK(hd_op->Push()); + STATUS_CHECK((*hd_op)(store)); return Status::OK(); } diff --git a/core/src/db/snapshot/Operations.cpp b/core/src/db/snapshot/Operations.cpp index 8a73b39bd..e47b3c076 100644 --- a/core/src/db/snapshot/Operations.cpp +++ b/core/src/db/snapshot/Operations.cpp @@ -74,9 +74,7 @@ Operations::GetID() const { Status Operations::operator()(Store& store) { - auto status = PreCheck(); - if (!status.ok()) - return status; + STATUS_CHECK(PreCheck()); return ApplyToStore(store); } @@ -114,9 +112,7 @@ Operations::PreCheck() { Status Operations::Push(bool sync) { - auto status = PreCheck(); - if (!status.ok()) - return status; + STATUS_CHECK(PreCheck()); return OperationExecutor::GetInstance().Submit(shared_from_this(), sync); } @@ -128,66 +124,55 @@ Operations::DoCheckStale(ScopedSnapshotT& latest_snapshot) const { Status Operations::CheckStale(const CheckStaleFunc& checker) const { decltype(prev_ss_) latest_ss; - auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, prev_ss_->GetCollection()->GetID()); - if (!status.ok()) - return status; + STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(latest_ss, prev_ss_->GetCollection()->GetID())); if (prev_ss_->GetID() != latest_ss->GetID()) { if (checker) { - status = checker(latest_ss); + STATUS_CHECK(checker(latest_ss)); } else { - status = DoCheckStale(latest_ss); + STATUS_CHECK(DoCheckStale(latest_ss)); } } - return status; + return Status::OK(); } Status -Operations::DoneRequired() const { - Status status; +Operations::CheckDone() const { if (!done_) { std::stringstream emsg; emsg << GetRepr() << ". Should be done"; - status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); + return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); } - return status; + return Status::OK(); } Status -Operations::IDSNotEmptyRequried() const { - Status status; +Operations::CheckIDSNotEmpty() const { if (ids_.size() == 0) { std::stringstream emsg; - emsg << GetRepr() << ". No rsource is available"; - status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); + emsg << GetRepr() << ". No resource available"; + return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); } - return status; + return Status::OK(); } Status -Operations::PrevSnapshotRequried() const { - Status status; +Operations::CheckPrevSnapshot() const { if (!prev_ss_) { std::stringstream emsg; - emsg << GetRepr() << ". Prev snapshot is requried"; - status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); + emsg << GetRepr() << ". Previous snapshot required"; + return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); } - return status; + return Status::OK(); } Status Operations::GetSnapshot(ScopedSnapshotT& ss) const { - auto status = PrevSnapshotRequried(); - if (!status.ok()) - return status; - status = DoneRequired(); - if (!status.ok()) - return status; - status = IDSNotEmptyRequried(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckPrevSnapshot()); + STATUS_CHECK(CheckDone()); + STATUS_CHECK(CheckIDSNotEmpty()); /* status = Snapshots::GetInstance().GetSnapshot(ss, prev_ss_->GetCollectionId(), ids_.back()); */ ss = context_.latest_ss; - return status; + return Status::OK(); } const Status& @@ -219,29 +204,23 @@ Operations::OnSnapshotStale() { Status Operations::OnExecute(Store& store) { - auto status = PreExecute(store); - if (!status.ok()) { - return status; - } - status = DoExecute(store); - if (!status.ok()) { - return status; - } - return PostExecute(store); + STATUS_CHECK(PreExecute(store)); + STATUS_CHECK(DoExecute(store)); + STATUS_CHECK(PostExecute(store)); + return Status::OK(); } Status Operations::PreExecute(Store& store) { - Status status; if (GetStartedSS() && type_ == OperationsType::W_Compound) { - Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId()); + STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId())); if (!context_.prev_ss) { - status = OnSnapshotDropped(); + STATUS_CHECK(OnSnapshotDropped()); } else if (prev_ss_->GetID() != context_.prev_ss->GetID()) { - status = OnSnapshotStale(); + STATUS_CHECK(OnSnapshotStale()); } } - return status; + return Status::OK(); } Status diff --git a/core/src/db/snapshot/Operations.h b/core/src/db/snapshot/Operations.h index fac6e36d3..18f65a1f7 100644 --- a/core/src/db/snapshot/Operations.h +++ b/core/src/db/snapshot/Operations.h @@ -152,11 +152,11 @@ class Operations : public std::enable_shared_from_this { FailureString() const; Status - DoneRequired() const; + CheckDone() const; Status - IDSNotEmptyRequried() const; + CheckIDSNotEmpty() const; Status - PrevSnapshotRequried() const; + CheckPrevSnapshot() const; Status ApplyRollBack(Store&); @@ -212,20 +212,16 @@ class CommitOperation : public Operations { if (wait) { WaitToFinish(); } - auto status = DoneRequired(); - if (!status.ok()) - return status; - status = IDSNotEmptyRequried(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckDone()); + STATUS_CHECK(CheckIDSNotEmpty()); resource_->SetID(ids_[0]); res = resource_; - return status; + return Status::OK(); } protected: Status - ResourceNotNullRequired() const { + CheckResource() const { Status status; if (!resource_) return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource"); @@ -261,19 +257,15 @@ class LoadOperation : public Operations { if (wait) { WaitToFinish(); } - auto status = DoneRequired(); - if (!status.ok()) - return status; - status = ResourceNotNullRequired(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckDone()); + STATUS_CHECK(CheckResource()); res = resource_; - return status; + return Status::OK(); } protected: Status - ResourceNotNullRequired() const { + CheckResource() const { Status status; if (!resource_) return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource"); @@ -298,14 +290,10 @@ class SoftDeleteOperation : public Operations { if (wait) { WaitToFinish(); } - auto status = DoneRequired(); - if (!status.ok()) - return status; - status = IDSNotEmptyRequried(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckDone()); + STATUS_CHECK(CheckIDSNotEmpty()); res = resource_; - return status; + return Status::OK(); } Status diff --git a/core/src/db/snapshot/ResourceHolder.h b/core/src/db/snapshot/ResourceHolder.h index 78406ca70..39660ff6e 100644 --- a/core/src/db/snapshot/ResourceHolder.h +++ b/core/src/db/snapshot/ResourceHolder.h @@ -17,6 +17,8 @@ #include #include #include +#include "db/snapshot/Event.h" +#include "db/snapshot/EventExecutor.h" #include "db/snapshot/Operations.h" #include "db/snapshot/ResourceTypes.h" #include "db/snapshot/ScopedResource.h" @@ -140,7 +142,8 @@ class ResourceHolder { virtual void OnNoRefCallBack(ResourcePtr resource) { - HardDelete(resource->GetID()); + auto evt_ptr = std::make_shared>(resource); + EventExecutor::GetInstance().Submit(evt_ptr); Release(resource->GetID()); } diff --git a/core/src/db/snapshot/ResourceOperations.h b/core/src/db/snapshot/ResourceOperations.h index eeefdaefe..924ab66d0 100644 --- a/core/src/db/snapshot/ResourceOperations.h +++ b/core/src/db/snapshot/ResourceOperations.h @@ -130,14 +130,12 @@ class LoadOperation : public Operations { if (wait) { WaitToFinish(); } - auto status = DoneRequired(); - if (!status.ok()) - return status; + STATUS_CHECK(CheckDone()); if (!resource_) { return Status(SS_NOT_FOUND_ERROR, "No specified resource"); } res = resource_; - return status; + return Status::OK(); } protected: diff --git a/core/src/db/snapshot/Resources.cpp b/core/src/db/snapshot/Resources.cpp index 7475dbc30..479fd6850 100644 --- a/core/src/db/snapshot/Resources.cpp +++ b/core/src/db/snapshot/Resources.cpp @@ -16,47 +16,47 @@ namespace milvus::engine::snapshot { -Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, +Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : NameField(name), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id, - LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), SchemaIdField(schema_id), MappingsField(mappings), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } -Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, LSN_TYPE lsn, State status, +Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : NameField(name), CollectionIdField(collection_id), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, ID_TYPE id, - LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), PartitionIdField(partition_id), MappingsField(mappings), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } @@ -70,18 +70,18 @@ PartitionCommit::ToString() const { for (auto sc_id : GetMappings()) { ss << sc_id << ", "; } - ss << ") status=" << GetStatus() << " "; + ss << ") state=" << GetState() << " "; return ss.str(); } -Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, +Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), PartitionIdField(partition_id), NumField(num), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } @@ -94,19 +94,19 @@ Segment::ToString() const { ss << "partition_id=" << GetPartitionId() << ", "; ss << "collection_id=" << GetCollectionId() << ", "; ss << "num=" << (NUM_TYPE)GetNum() << ", "; - ss << "status=" << GetStatus() << ", "; + ss << "state=" << GetState() << ", "; return ss.str(); } SegmentCommit::SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings, - ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : SchemaIdField(schema_id), PartitionIdField(partition_id), SegmentIdField(segment_id), MappingsField(mappings), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } @@ -118,66 +118,66 @@ SegmentCommit::ToString() const { ss << "id=" << GetID() << ", "; ss << "partition_id=" << GetPartitionId() << ", "; ss << "segment_id=" << GetSegmentId() << ", "; - ss << "status=" << GetStatus() << ", "; + ss << "state=" << GetState() << ", "; return ss.str(); } SegmentFile::SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, - ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), PartitionIdField(partition_id), SegmentIdField(segment_id), FieldElementIdField(field_element_id), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } -SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status, +SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), MappingsField(mappings), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } -Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, +Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : NameField(name), NumField(num), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, - State status, TS_TYPE created_on, TS_TYPE updated_on) + State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), FieldIdField(field_id), MappingsField(mappings), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, - ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), FieldIdField(field_id), NameField(name), FtypeField(ftype), IdField(id), LsnField(lsn), - StatusField(status), + StateField(state), CreatedOnField(created_on), UpdatedOnField(updated_on) { } diff --git a/core/src/db/snapshot/Resources.h b/core/src/db/snapshot/Resources.h index 2bf348f74..ccda4487e 100644 --- a/core/src/db/snapshot/Resources.h +++ b/core/src/db/snapshot/Resources.h @@ -46,46 +46,46 @@ class MappingsField { MappingT mappings_; }; -class StatusField { +class StateField { public: - explicit StatusField(State status = PENDING) : status_(status) { + explicit StateField(State state = PENDING) : state_(state) { } State - GetStatus() const { - return status_; + GetState() const { + return state_; } bool IsActive() const { - return status_ == ACTIVE; + return state_ == ACTIVE; } bool IsDeactive() const { - return status_ == DEACTIVE; + return state_ == DEACTIVE; } bool Activate() { if (IsDeactive()) return false; - status_ = ACTIVE; + state_ = ACTIVE; return true; } void Deactivate() { - status_ = DEACTIVE; + state_ = DEACTIVE; } void ResetStatus() { - status_ = PENDING; + state_ = PENDING; } protected: - State status_; + State state_; }; class LsnField { @@ -297,7 +297,7 @@ class Collection : public BaseResource, public NameField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -319,7 +319,7 @@ class CollectionCommit : public BaseResource, public MappingsField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -342,7 +342,7 @@ class Partition : public BaseResource, public CollectionIdField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -364,7 +364,7 @@ class PartitionCommit : public BaseResource, public MappingsField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -392,7 +392,7 @@ class Segment : public BaseResource, public NumField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -419,7 +419,7 @@ class SegmentCommit : public BaseResource, public MappingsField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -448,7 +448,7 @@ class SegmentFile : public BaseResource, public FieldElementIdField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -472,7 +472,7 @@ class SchemaCommit : public BaseResource, public MappingsField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -496,7 +496,7 @@ class Field : public BaseResource, public NumField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -518,7 +518,7 @@ class FieldCommit : public BaseResource, public MappingsField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: @@ -544,7 +544,7 @@ class FieldElement : public BaseResource, public FtypeField, public IdField, public LsnField, - public StatusField, + public StateField, public CreatedOnField, public UpdatedOnField { public: -- GitLab