diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index ad0be26bbfc9dd08579a491263e6a6ce2ca82631..c584b3681b70dc2d0c99b5793c12f26d56678990 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -66,7 +66,9 @@ Status BuildOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const { auto segment = latest_snapshot->GetResource(segment_id); if (!segment) { - return Status(SS_STALE_ERROR, "BuildOperation target segment is stale"); + std::stringstream emsg; + emsg << GetRepr() << ". Target segment " << segment_id << " is stale"; + return Status(SS_STALE_ERROR, emsg.str()); } return Status::OK(); } @@ -79,7 +81,9 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF return status; auto segment = GetStartedSS()->GetResource(context.segment_id); if (!segment) { - return Status(SS_INVALID_CONTEX_ERROR, "Invalid segment_id in context"); + std::stringstream emsg; + emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } auto ctx = context; ctx.partition_id = segment->GetPartitionId(); @@ -190,7 +194,9 @@ MergeOperation::OnSnapshotStale() { auto expect_sc = GetStartedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID()); auto latest_sc = GetAdjustedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID()); if (!latest_sc || (latest_sc->GetID() != expect_sc->GetID())) { - return Status(SS_STALE_ERROR, "MergeOperation on stale segments"); + std::stringstream emsg; + emsg << GetRepr() << ". Stale segment " << stale_seg->GetID() << " in context"; + return Status(SS_STALE_ERROR, emsg.str()); } } return Status::OK(); @@ -349,8 +355,11 @@ DropPartitionOperation::DoExecute(Store& store) { if (!status.ok()) return status; auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id); - if (!p_c) - return Status(SS_NOT_FOUND_ERROR, "No partition commit found"); + if (!p_c) { + std::stringstream emsg; + emsg << GetRepr() << ". PartitionCommit " << id << " not found"; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); + } context_.stale_partition_commit = p_c; OperationContext op_ctx; @@ -378,7 +387,9 @@ CreatePartitionOperation::PreCheck() { return status; } if (!context_.new_partition) { - status = Status(SS_INVALID_CONTEX_ERROR, "No partition specified before push partition"); + std::stringstream emsg; + emsg << GetRepr() << ". Partition is missing"; + status = Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } return status; } @@ -531,8 +542,11 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const { status = IDSNotEmptyRequried(); if (!status.ok()) return status; - if (!c_context_.collection_commit) - return Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available"); + if (!c_context_.collection_commit) { + std::stringstream emsg; + emsg << GetRepr() << ". No snapshot is available"; + return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); + } /* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */ ss = context_.latest_ss; return status; @@ -541,7 +555,9 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const { Status DropCollectionOperation::DoExecute(Store& store) { if (!context_.collection) { - return Status(SS_INVALID_CONTEX_ERROR, "Invalid Context"); + std::stringstream emsg; + emsg << GetRepr() << ". Collection is missing in context"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } context_.collection->Deactivate(); AddStepWithLsn(*context_.collection, context_.lsn); diff --git a/core/src/db/snapshot/Operations.cpp b/core/src/db/snapshot/Operations.cpp index 3657ddaaec4c641b3b48b9cb2ad5c5146ec4545b..8a73b39bdbdf874b44653c97656833b7c8994935 100644 --- a/core/src/db/snapshot/Operations.cpp +++ b/core/src/db/snapshot/Operations.cpp @@ -145,7 +145,9 @@ Status Operations::DoneRequired() const { Status status; if (!done_) { - status = Status(SS_CONSTRAINT_CHECK_ERROR, "Operation is expected to be done"); + std::stringstream emsg; + emsg << GetRepr() << ". Should be done"; + status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); } return status; } @@ -153,8 +155,11 @@ Operations::DoneRequired() const { Status Operations::IDSNotEmptyRequried() const { Status status; - if (ids_.size() == 0) - status = Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available"); + if (ids_.size() == 0) { + std::stringstream emsg; + emsg << GetRepr() << ". No rsource is available"; + status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); + } return status; } @@ -162,7 +167,9 @@ Status Operations::PrevSnapshotRequried() const { Status status; if (!prev_ss_) { - status = Status(SS_CONSTRAINT_CHECK_ERROR, "Prev snapshot is requried"); + std::stringstream emsg; + emsg << GetRepr() << ". Prev snapshot is requried"; + status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str()); } return status; } diff --git a/core/src/db/snapshot/Operations.h b/core/src/db/snapshot/Operations.h index a2d872d9ac1d81ba8637df7a97b216888514902e..fac6e36d3b762fca456ecd47f7ffda6b101d2c75 100644 --- a/core/src/db/snapshot/Operations.h +++ b/core/src/db/snapshot/Operations.h @@ -284,6 +284,51 @@ class LoadOperation : public Operations { typename ResourceT::Ptr resource_; }; +template +class SoftDeleteOperation : public Operations { + public: + using BaseT = Operations; + explicit SoftDeleteOperation(ID_TYPE id) : BaseT(OperationContext(), ScopedSnapshotT()), id_(id) { + } + + Status + GetResource(typename ResourceT::Ptr& res, bool wait = false) { + if (!status_.ok()) + return status_; + if (wait) { + WaitToFinish(); + } + auto status = DoneRequired(); + if (!status.ok()) + return status; + status = IDSNotEmptyRequried(); + if (!status.ok()) + return status; + res = resource_; + return status; + } + + Status + DoExecute(Store& store) override { + auto status = store.GetResource(id_, resource_); + if (!status.ok()) { + return status; + } + if (!resource_) { + std::stringstream emsg; + emsg << "Specified " << typeid(ResourceT).name() << " id=" << id_ << " not found"; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); + } + resource_->Deactivate(); + AddStep(*resource_, false); + return status; + } + + protected: + ID_TYPE id_; + typename ResourceT::Ptr resource_; +}; + template class HardDeleteOperation : public Operations { public: diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index d967a4a2c19411d90dbbc33e37afad604452112f..1363ada3a2e5ec1b5917f0463bea674606c161fb 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -19,8 +19,11 @@ namespace snapshot { Status CollectionCommitOperation::DoExecute(Store& store) { auto prev_resource = GetPrevResource(); - if (!prev_resource) - return Status(SS_INVALID_CONTEX_ERROR, "Invalid CollectionCommitOperation Context"); + if (!prev_resource) { + std::stringstream emsg; + emsg << GetRepr() << ". Cannot find prev collection commit resource"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } resource_ = std::make_shared(*prev_resource); resource_->ResetStatus(); if (context_.stale_partition_commit) { @@ -89,7 +92,10 @@ PartitionCommitOperation::DoExecute(Store& store) { if (context_.stale_segments.size() > 0) { for (auto& stale_segment : context_.stale_segments) { if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) { - return Status(SS_INVALID_CONTEX_ERROR, "All stale segments should from specified partition"); + std::stringstream emsg; + emsg << GetRepr() << ". All stale segments should from partition "; + emsg << prev_resource->GetPartitionId(); + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } auto stale_segment_commit = GetStartedSS()->GetSegmentCommitBySegmentId(stale_segment->GetID()); resource_->GetMappings().erase(stale_segment_commit->GetID()); @@ -97,7 +103,9 @@ PartitionCommitOperation::DoExecute(Store& store) { } } else { if (!context_.new_partition) { - return Status(SS_INVALID_CONTEX_ERROR, "Partition is required"); + std::stringstream emsg; + emsg << GetRepr() << ". New partition is required"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } resource_ = std::make_shared(GetStartedSS()->GetCollectionId(), context_.new_partition->GetID()); @@ -127,15 +135,20 @@ SegmentOperation::SegmentOperation(const OperationContext& context, ScopedSnapsh Status SegmentOperation::PreCheck() { - if (!context_.prev_partition) - return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context"); + if (!context_.prev_partition) { + std::stringstream emsg; + emsg << GetRepr() << ". prev_partition should be specified in context"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } return Status::OK(); } Status SegmentOperation::DoExecute(Store& store) { if (!context_.prev_partition) { - return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context"); + std::stringstream emsg; + emsg << GetRepr() << ". prev_partition should be specified in context"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID()); resource_ = std::make_shared(context_.prev_partition->GetID(), prev_num + 1); @@ -169,7 +182,9 @@ SegmentCommitOperation::DoExecute(Store& store) { Status SegmentCommitOperation::PreCheck() { if (context_.new_segment_files.size() == 0) { - return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentCommitOperation Context"); + std::stringstream emsg; + emsg << GetRepr() << ". new_segment_files should not be empty in context"; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } return Status::OK(); } diff --git a/core/src/db/snapshot/SnapshotHolder.cpp b/core/src/db/snapshot/SnapshotHolder.cpp index f012e0ec688b696bdc6e6ad842f9c58c3ea8374f..fa2182e0eb53b5ec70b1010abd4725acc5e56663 100644 --- a/core/src/db/snapshot/SnapshotHolder.cpp +++ b/core/src/db/snapshot/SnapshotHolder.cpp @@ -55,12 +55,20 @@ SnapshotHolder::Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id, bool scoped) return status; } if (id < min_id_) { - return Status(SS_STALE_ERROR, "Get stale snapshot"); + std::stringstream emsg; + emsg << "SnapshotHolder::Load: Got stale snapshot " << id; + emsg << " current is " << max_id_; + emsg << " on collection " << collection_id_; + return Status(SS_STALE_ERROR, emsg.str()); } auto it = active_.find(id); if (it == active_.end()) { - return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found"); + std::stringstream emsg; + emsg << "SnapshotHolder::Load: Specified snapshot " << id << " not found."; + emsg << " Current is " << max_id_; + emsg << " on collection " << collection_id_; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } ss = ScopedSnapshotT(it->second, scoped); return status; @@ -70,7 +78,11 @@ Status SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) { Status status; if (id > max_id_) { - return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found"); + std::stringstream emsg; + emsg << "SnapshotHolder::Get: Specified snapshot " << id << " not found."; + emsg << " Current is " << max_id_; + emsg << " on collection " << collection_id_; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } std::unique_lock lock(mutex_); @@ -80,12 +92,20 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) { return status; } if (id < min_id_) { - return Status(SS_STALE_ERROR, "Get stale snapshot"); + std::stringstream emsg; + emsg << "SnapshotHolder::Get: Got stale snapshot " << id; + emsg << " current is " << max_id_; + emsg << " on collection " << collection_id_; + return Status(SS_STALE_ERROR, emsg.str()); } auto it = active_.find(id); if (it == active_.end()) { - return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found"); + std::stringstream emsg; + emsg << "SnapshotHolder::Get: Specified snapshot " << id << " not found."; + emsg << " Current is " << max_id_; + emsg << " on collection " << collection_id_; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } ss = ScopedSnapshotT(it->second, scoped); return status; @@ -106,11 +126,16 @@ SnapshotHolder::Add(ID_TYPE id) { { std::unique_lock lock(mutex_); if (active_.size() > 0 && id < max_id_) { - return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid ID"); + std::stringstream emsg; + emsg << "SnapshotHolder::Add: Invalid snapshot " << id << "."; + emsg << " Should larger than " << max_id_; + return Status(SS_INVALID_ARGUMENT_ERROR, emsg.str()); } auto it = active_.find(id); if (it != active_.end()) { - return Status(SS_DUPLICATED_ERROR, "Duplicated ID"); + std::stringstream emsg; + emsg << "SnapshotHolder::Add: Duplicated snapshot " << id << "."; + return Status(SS_DUPLICATED_ERROR, emsg.str()); } } Snapshot::Ptr oldest_ss; @@ -119,7 +144,9 @@ SnapshotHolder::Add(ID_TYPE id) { std::unique_lock lock(mutex_); if (!IsActive(ss)) { - return Status(SS_NOT_ACTIVE_ERROR, "Specified collection is not active now"); + std::stringstream emsg; + emsg << "SnapshotHolder::Add: Specified collection " << collection_id_; + return Status(SS_NOT_ACTIVE_ERROR, emsg.str()); } ss->RegisterOnNoRefCB(std::bind(&Snapshot::UnRefAll, ss)); ss->Ref(); diff --git a/core/src/db/snapshot/Snapshots.cpp b/core/src/db/snapshot/Snapshots.cpp index 7f559e3abe8018c0119170b0da2c0c85ab9e6108..c177924a6c9dec1d9d5827be9ad65999e51aa24a 100644 --- a/core/src/db/snapshot/Snapshots.cpp +++ b/core/src/db/snapshot/Snapshots.cpp @@ -121,7 +121,9 @@ Snapshots::LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& ho (*op)(store); auto& collection_commit_ids = op->GetIDs(); if (collection_commit_ids.size() == 0) { - return Status(SS_NOT_FOUND_ERROR, "No collection commit found"); + std::stringstream emsg; + emsg << "Snapshots::LoadNoLock: No collection commit is found for collection " << collection_id; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } holder = std::make_shared(collection_id, std::bind(&Snapshots::SnapshotGCCallback, this, std::placeholders::_1)); @@ -153,7 +155,11 @@ Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) { lock.unlock(); return GetHolder(kv->second, holder); } - return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found"); + std::stringstream emsg; + emsg << "Snapshots::GetHolderNoLock: Specified snapshot holder for collection "; + emsg << "\"" << name << "\"" + << " not found"; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } Status @@ -192,7 +198,10 @@ Status Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) { auto it = holders_.find(collection_id); if (it == holders_.end()) { - return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found"); + std::stringstream emsg; + emsg << "Snapshots::GetHolderNoLock: Specified snapshot holder for collection " << collection_id; + emsg << " not found"; + return Status(SS_NOT_FOUND_ERROR, emsg.str()); } holder = it->second; return Status::OK(); diff --git a/core/unittest/db/test_snapshot.cpp b/core/unittest/db/test_snapshot.cpp index f1080e5a14819b6138b326520db43ae954e1ad70..e4f67fb2d396759f7df98ca03389740ea647641a 100644 --- a/core/unittest/db/test_snapshot.cpp +++ b/core/unittest/db/test_snapshot.cpp @@ -34,6 +34,7 @@ using ID_TYPE = milvus::engine::snapshot::ID_TYPE; using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE; using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE; using MappingT = milvus::engine::snapshot::MappingT; +using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext; using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext; using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext; using OperationContext = milvus::engine::snapshot::OperationContext; @@ -63,6 +64,7 @@ using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT; using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy; using Queue = milvus::BlockingQueue; using TQueue = milvus::BlockingQueue>; +using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; int RandomInt(int start, int end) { std::random_device dev; @@ -197,6 +199,70 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) { return ss; } +TEST_F(SnapshotTest, DeleteOperationTest) { + std::string collection_name = "test_c1"; + LSN_TYPE lsn = 1; + auto ss = CreateCollection(collection_name, lsn); + ASSERT_TRUE(ss); + + auto collection = CollectionsHolder::GetInstance().GetResource(ss->GetCollectionId()); + ASSERT_EQ(collection->GetName(), collection_name); + + { + auto soft_op = std::make_shared(collection->GetID()); + auto status = soft_op->Push(); + ASSERT_TRUE(status.ok()); + CollectionPtr soft_deleted; + status = soft_op->GetResource(soft_deleted); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(soft_deleted); + ASSERT_EQ(soft_deleted->GetID(), collection->GetID()); + ASSERT_TRUE(soft_deleted->IsDeactive()); + } + + { + CollectionPtr loaded; + LoadOperationContext context; + context.id = collection->GetID(); + auto load_op = std::make_shared>(context); + auto status = load_op->Push(); + ASSERT_TRUE(status.ok()); + status = load_op->GetResource(loaded); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(loaded); + ASSERT_EQ(loaded->GetID(), collection->GetID()); + ASSERT_TRUE(loaded->IsDeactive()); + } + + { + auto hard_op = std::make_shared + >(collection->GetID()); + auto status = hard_op->Push(); + ASSERT_TRUE(status.ok()); + } + + { + CollectionPtr loaded; + LoadOperationContext context; + context.id = collection->GetID(); + auto load_op = std::make_shared>(context); + auto status = load_op->Push(); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + } + ASSERT_FALSE(status.ok()); + } + + { + auto soft_op = std::make_shared(collection->GetID()); + auto status = soft_op->Push(); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + } + ASSERT_FALSE(status.ok()); + } +} + TEST_F(SnapshotTest, CreateCollectionOperationTest) { ScopedSnapshotT expect_null; auto status = Snapshots::GetInstance().GetSnapshot(expect_null, 100000);