未验证 提交 39c7e8a4 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Add SoftDeleteOperation and related test (#2647)

* (db/snapshot): add soft delete operation and related test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): more readable error msg in snapshot
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix clang format
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix clang format
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 5c833ca6
......@@ -66,7 +66,9 @@ Status
BuildOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const {
auto segment = latest_snapshot->GetResource<Segment>(segment_id);
if (!segment) {
return Status(SS_STALE_ERROR, "BuildOperation target segment is stale");
std::stringstream emsg;
emsg << GetRepr() << ". Target segment " << segment_id << " is stale";
return Status(SS_STALE_ERROR, emsg.str());
}
return Status::OK();
}
......@@ -79,7 +81,9 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
return status;
auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
if (!segment) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid segment_id in context");
std::stringstream emsg;
emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
auto ctx = context;
ctx.partition_id = segment->GetPartitionId();
......@@ -190,7 +194,9 @@ MergeOperation::OnSnapshotStale() {
auto expect_sc = GetStartedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
auto latest_sc = GetAdjustedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
if (!latest_sc || (latest_sc->GetID() != expect_sc->GetID())) {
return Status(SS_STALE_ERROR, "MergeOperation on stale segments");
std::stringstream emsg;
emsg << GetRepr() << ". Stale segment " << stale_seg->GetID() << " in context";
return Status(SS_STALE_ERROR, emsg.str());
}
}
return Status::OK();
......@@ -349,8 +355,11 @@ DropPartitionOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id);
if (!p_c)
return Status(SS_NOT_FOUND_ERROR, "No partition commit found");
if (!p_c) {
std::stringstream emsg;
emsg << GetRepr() << ". PartitionCommit " << id << " not found";
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
context_.stale_partition_commit = p_c;
OperationContext op_ctx;
......@@ -378,7 +387,9 @@ CreatePartitionOperation::PreCheck() {
return status;
}
if (!context_.new_partition) {
status = Status(SS_INVALID_CONTEX_ERROR, "No partition specified before push partition");
std::stringstream emsg;
emsg << GetRepr() << ". Partition is missing";
status = Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
return status;
}
......@@ -531,8 +542,11 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
if (!c_context_.collection_commit)
return Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available");
if (!c_context_.collection_commit) {
std::stringstream emsg;
emsg << GetRepr() << ". No snapshot is available";
return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
/* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */
ss = context_.latest_ss;
return status;
......@@ -541,7 +555,9 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
Status
DropCollectionOperation::DoExecute(Store& store) {
if (!context_.collection) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid Context");
std::stringstream emsg;
emsg << GetRepr() << ". Collection is missing in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
context_.collection->Deactivate();
AddStepWithLsn(*context_.collection, context_.lsn);
......
......@@ -145,7 +145,9 @@ Status
Operations::DoneRequired() const {
Status status;
if (!done_) {
status = Status(SS_CONSTRAINT_CHECK_ERROR, "Operation is expected to be done");
std::stringstream emsg;
emsg << GetRepr() << ". Should be done";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
}
......@@ -153,8 +155,11 @@ Operations::DoneRequired() const {
Status
Operations::IDSNotEmptyRequried() const {
Status status;
if (ids_.size() == 0)
status = Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available");
if (ids_.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". No rsource is available";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
}
......@@ -162,7 +167,9 @@ Status
Operations::PrevSnapshotRequried() const {
Status status;
if (!prev_ss_) {
status = Status(SS_CONSTRAINT_CHECK_ERROR, "Prev snapshot is requried");
std::stringstream emsg;
emsg << GetRepr() << ". Prev snapshot is requried";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
}
......
......@@ -284,6 +284,51 @@ class LoadOperation : public Operations {
typename ResourceT::Ptr resource_;
};
template <typename ResourceT>
class SoftDeleteOperation : public Operations {
public:
using BaseT = Operations;
explicit SoftDeleteOperation(ID_TYPE id) : BaseT(OperationContext(), ScopedSnapshotT()), id_(id) {
}
Status
GetResource(typename ResourceT::Ptr& res, bool wait = false) {
if (!status_.ok())
return status_;
if (wait) {
WaitToFinish();
}
auto status = DoneRequired();
if (!status.ok())
return status;
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
res = resource_;
return status;
}
Status
DoExecute(Store& store) override {
auto status = store.GetResource<ResourceT>(id_, resource_);
if (!status.ok()) {
return status;
}
if (!resource_) {
std::stringstream emsg;
emsg << "Specified " << typeid(ResourceT).name() << " id=" << id_ << " not found";
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
resource_->Deactivate();
AddStep(*resource_, false);
return status;
}
protected:
ID_TYPE id_;
typename ResourceT::Ptr resource_;
};
template <typename ResourceT>
class HardDeleteOperation : public Operations {
public:
......
......@@ -19,8 +19,11 @@ namespace snapshot {
Status
CollectionCommitOperation::DoExecute(Store& store) {
auto prev_resource = GetPrevResource();
if (!prev_resource)
return Status(SS_INVALID_CONTEX_ERROR, "Invalid CollectionCommitOperation Context");
if (!prev_resource) {
std::stringstream emsg;
emsg << GetRepr() << ". Cannot find prev collection commit resource";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
resource_ = std::make_shared<CollectionCommit>(*prev_resource);
resource_->ResetStatus();
if (context_.stale_partition_commit) {
......@@ -89,7 +92,10 @@ PartitionCommitOperation::DoExecute(Store& store) {
if (context_.stale_segments.size() > 0) {
for (auto& stale_segment : context_.stale_segments) {
if (stale_segment->GetPartitionId() != prev_resource->GetPartitionId()) {
return Status(SS_INVALID_CONTEX_ERROR, "All stale segments should from specified partition");
std::stringstream emsg;
emsg << GetRepr() << ". All stale segments should from partition ";
emsg << prev_resource->GetPartitionId();
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
auto stale_segment_commit = GetStartedSS()->GetSegmentCommitBySegmentId(stale_segment->GetID());
resource_->GetMappings().erase(stale_segment_commit->GetID());
......@@ -97,7 +103,9 @@ PartitionCommitOperation::DoExecute(Store& store) {
}
} else {
if (!context_.new_partition) {
return Status(SS_INVALID_CONTEX_ERROR, "Partition is required");
std::stringstream emsg;
emsg << GetRepr() << ". New partition is required";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
resource_ =
std::make_shared<PartitionCommit>(GetStartedSS()->GetCollectionId(), context_.new_partition->GetID());
......@@ -127,15 +135,20 @@ SegmentOperation::SegmentOperation(const OperationContext& context, ScopedSnapsh
Status
SegmentOperation::PreCheck() {
if (!context_.prev_partition)
return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context");
if (!context_.prev_partition) {
std::stringstream emsg;
emsg << GetRepr() << ". prev_partition should be specified in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
return Status::OK();
}
Status
SegmentOperation::DoExecute(Store& store) {
if (!context_.prev_partition) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentOperation Context");
std::stringstream emsg;
emsg << GetRepr() << ". prev_partition should be specified in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID());
resource_ = std::make_shared<Segment>(context_.prev_partition->GetID(), prev_num + 1);
......@@ -169,7 +182,9 @@ SegmentCommitOperation::DoExecute(Store& store) {
Status
SegmentCommitOperation::PreCheck() {
if (context_.new_segment_files.size() == 0) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid SegmentCommitOperation Context");
std::stringstream emsg;
emsg << GetRepr() << ". new_segment_files should not be empty in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
return Status::OK();
}
......
......@@ -55,12 +55,20 @@ SnapshotHolder::Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id, bool scoped)
return status;
}
if (id < min_id_) {
return Status(SS_STALE_ERROR, "Get stale snapshot");
std::stringstream emsg;
emsg << "SnapshotHolder::Load: Got stale snapshot " << id;
emsg << " current is " << max_id_;
emsg << " on collection " << collection_id_;
return Status(SS_STALE_ERROR, emsg.str());
}
auto it = active_.find(id);
if (it == active_.end()) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
std::stringstream emsg;
emsg << "SnapshotHolder::Load: Specified snapshot " << id << " not found.";
emsg << " Current is " << max_id_;
emsg << " on collection " << collection_id_;
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
ss = ScopedSnapshotT(it->second, scoped);
return status;
......@@ -70,7 +78,11 @@ Status
SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
Status status;
if (id > max_id_) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
std::stringstream emsg;
emsg << "SnapshotHolder::Get: Specified snapshot " << id << " not found.";
emsg << " Current is " << max_id_;
emsg << " on collection " << collection_id_;
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
std::unique_lock<std::mutex> lock(mutex_);
......@@ -80,12 +92,20 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
return status;
}
if (id < min_id_) {
return Status(SS_STALE_ERROR, "Get stale snapshot");
std::stringstream emsg;
emsg << "SnapshotHolder::Get: Got stale snapshot " << id;
emsg << " current is " << max_id_;
emsg << " on collection " << collection_id_;
return Status(SS_STALE_ERROR, emsg.str());
}
auto it = active_.find(id);
if (it == active_.end()) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
std::stringstream emsg;
emsg << "SnapshotHolder::Get: Specified snapshot " << id << " not found.";
emsg << " Current is " << max_id_;
emsg << " on collection " << collection_id_;
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
ss = ScopedSnapshotT(it->second, scoped);
return status;
......@@ -106,11 +126,16 @@ SnapshotHolder::Add(ID_TYPE id) {
{
std::unique_lock<std::mutex> lock(mutex_);
if (active_.size() > 0 && id < max_id_) {
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid ID");
std::stringstream emsg;
emsg << "SnapshotHolder::Add: Invalid snapshot " << id << ".";
emsg << " Should larger than " << max_id_;
return Status(SS_INVALID_ARGUMENT_ERROR, emsg.str());
}
auto it = active_.find(id);
if (it != active_.end()) {
return Status(SS_DUPLICATED_ERROR, "Duplicated ID");
std::stringstream emsg;
emsg << "SnapshotHolder::Add: Duplicated snapshot " << id << ".";
return Status(SS_DUPLICATED_ERROR, emsg.str());
}
}
Snapshot::Ptr oldest_ss;
......@@ -119,7 +144,9 @@ SnapshotHolder::Add(ID_TYPE id) {
std::unique_lock<std::mutex> lock(mutex_);
if (!IsActive(ss)) {
return Status(SS_NOT_ACTIVE_ERROR, "Specified collection is not active now");
std::stringstream emsg;
emsg << "SnapshotHolder::Add: Specified collection " << collection_id_;
return Status(SS_NOT_ACTIVE_ERROR, emsg.str());
}
ss->RegisterOnNoRefCB(std::bind(&Snapshot::UnRefAll, ss));
ss->Ref();
......
......@@ -121,7 +121,9 @@ Snapshots::LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& ho
(*op)(store);
auto& collection_commit_ids = op->GetIDs();
if (collection_commit_ids.size() == 0) {
return Status(SS_NOT_FOUND_ERROR, "No collection commit found");
std::stringstream emsg;
emsg << "Snapshots::LoadNoLock: No collection commit is found for collection " << collection_id;
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
holder = std::make_shared<SnapshotHolder>(collection_id,
std::bind(&Snapshots::SnapshotGCCallback, this, std::placeholders::_1));
......@@ -153,7 +155,11 @@ Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) {
lock.unlock();
return GetHolder(kv->second, holder);
}
return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found");
std::stringstream emsg;
emsg << "Snapshots::GetHolderNoLock: Specified snapshot holder for collection ";
emsg << "\"" << name << "\""
<< " not found";
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
Status
......@@ -192,7 +198,10 @@ Status
Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) {
auto it = holders_.find(collection_id);
if (it == holders_.end()) {
return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found");
std::stringstream emsg;
emsg << "Snapshots::GetHolderNoLock: Specified snapshot holder for collection " << collection_id;
emsg << " not found";
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
holder = it->second;
return Status::OK();
......
......@@ -34,6 +34,7 @@ using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
......@@ -63,6 +64,7 @@ using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
int RandomInt(int start, int end) {
std::random_device dev;
......@@ -197,6 +199,70 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
return ss;
}
TEST_F(SnapshotTest, DeleteOperationTest) {
std::string collection_name = "test_c1";
LSN_TYPE lsn = 1;
auto ss = CreateCollection(collection_name, lsn);
ASSERT_TRUE(ss);
auto collection = CollectionsHolder::GetInstance().GetResource(ss->GetCollectionId());
ASSERT_EQ(collection->GetName(), collection_name);
{
auto soft_op = std::make_shared<SoftDeleteCollectionOperation>(collection->GetID());
auto status = soft_op->Push();
ASSERT_TRUE(status.ok());
CollectionPtr soft_deleted;
status = soft_op->GetResource(soft_deleted);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(soft_deleted);
ASSERT_EQ(soft_deleted->GetID(), collection->GetID());
ASSERT_TRUE(soft_deleted->IsDeactive());
}
{
CollectionPtr loaded;
LoadOperationContext context;
context.id = collection->GetID();
auto load_op = std::make_shared<milvus::engine::snapshot::LoadOperation<Collection>>(context);
auto status = load_op->Push();
ASSERT_TRUE(status.ok());
status = load_op->GetResource(loaded);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(loaded);
ASSERT_EQ(loaded->GetID(), collection->GetID());
ASSERT_TRUE(loaded->IsDeactive());
}
{
auto hard_op = std::make_shared<milvus::engine::snapshot::HardDeleteOperation<Collection>
>(collection->GetID());
auto status = hard_op->Push();
ASSERT_TRUE(status.ok());
}
{
CollectionPtr loaded;
LoadOperationContext context;
context.id = collection->GetID();
auto load_op = std::make_shared<milvus::engine::snapshot::LoadOperation<Collection>>(context);
auto status = load_op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
}
ASSERT_FALSE(status.ok());
}
{
auto soft_op = std::make_shared<SoftDeleteCollectionOperation>(collection->GetID());
auto status = soft_op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
}
ASSERT_FALSE(status.ok());
}
}
TEST_F(SnapshotTest, CreateCollectionOperationTest) {
ScopedSnapshotT expect_null;
auto status = Snapshots::GetInstance().GetSnapshot(expect_null, 100000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册