未验证 提交 ba26b677 编写于 作者: C Cai Yudong 提交者: GitHub

snapshot gc (#2668)

* rename StatusField to StateField
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix typo
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* enable GC in OnNoRefCallBack
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* code opt
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 403753a1
......@@ -14,6 +14,7 @@
#include <sstream>
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
......@@ -24,42 +25,30 @@ BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT
Status
BuildOperation::DoExecute(Store& store) {
auto status = CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1,
context_.new_segment_files[0]->GetSegmentId()));
if (!status.ok())
return status;
STATUS_CHECK(CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1,
context_.new_segment_files[0]->GetSegmentId())));
SegmentCommitOperation op(context_, GetAdjustedSS());
op(store);
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
SegmentCommitOperation sc_op(context_, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
pc_op(store);
STATUS_CHECK(pc_op(store));
OperationContext cc_context;
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
status = pc_op.GetResource(context_.new_partition_commit);
if (!status.ok())
return status;
context_.new_partition_commit = cc_context.new_partition_commit;
STATUS_CHECK(pc_op.GetResource(context_.new_partition_commit));
AddStepWithLsn(*context_.new_partition_commit, context_.lsn);
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
cc_op(store);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
STATUS_CHECK(cc_op(store));
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
return Status::OK();
}
Status
......@@ -75,10 +64,9 @@ BuildOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segm
Status
BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
auto status =
CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id));
if (!status.ok())
return status;
STATUS_CHECK(
CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)));
auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
if (!segment) {
std::stringstream emsg;
......@@ -88,15 +76,12 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
auto ctx = context;
ctx.partition_id = segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
status = new_sf_op->Push();
if (!status.ok())
return status;
status = new_sf_op->GetResource(created);
if (!status.ok())
return status;
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
context_.new_segment_files.push_back(created);
AddStepWithLsn(*created, context_.lsn);
return status;
return Status::OK();
}
NewSegmentOperation::NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
......@@ -111,13 +96,9 @@ NewSegmentOperation::DoExecute(Store& store) {
/* auto status = PrevSnapshotRequried(); */
/* if (!status.ok()) return status; */
// TODO: Check Context
SegmentCommitOperation op(context_, GetAdjustedSS());
auto status = op(store);
if (!status.ok())
return status;
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
SegmentCommitOperation sc_op(context_, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
/* std::cout << GetRepr() << " POST_SC_MAP=("; */
/* for (auto id : context_.new_segment_commit->GetMappings()) { */
......@@ -126,14 +107,9 @@ NewSegmentOperation::DoExecute(Store& store) {
/* std::cout << ")" << std::endl; */
OperationContext cc_context;
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
STATUS_CHECK(pc_op(store));
STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
/* std::cout << GetRepr() << " POST_PC_MAP=("; */
......@@ -143,46 +119,34 @@ NewSegmentOperation::DoExecute(Store& store) {
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
STATUS_CHECK(cc_op(store));
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
return Status::OK();
}
Status
NewSegmentOperation::CommitNewSegment(SegmentPtr& created) {
auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
auto status = op->Push();
if (!status.ok())
return status;
status = op->GetResource(context_.new_segment);
if (!status.ok())
return status;
STATUS_CHECK(op->Push());
STATUS_CHECK(op->GetResource(context_.new_segment));
created = context_.new_segment;
AddStepWithLsn(*created, context_.lsn);
return status;
return Status::OK();
}
Status
NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
auto c = context;
c.segment_id = context_.new_segment->GetID();
c.partition_id = context_.new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, GetStartedSS());
auto status = new_sf_op->Push();
if (!status.ok())
return status;
status = new_sf_op->GetResource(created);
if (!status.ok())
return status;
auto ctx = context;
ctx.segment_id = context_.new_segment->GetID();
ctx.partition_id = context_.new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
AddStepWithLsn(*created, context_.lsn);
context_.new_segment_files.push_back(created);
return status;
return Status::OK();
}
MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
......@@ -204,58 +168,42 @@ MergeOperation::OnSnapshotStale() {
Status
MergeOperation::CommitNewSegment(SegmentPtr& created) {
Status status;
if (context_.new_segment) {
created = context_.new_segment;
return status;
return Status::OK();
}
auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
status = op->Push();
if (!status.ok())
return status;
status = op->GetResource(context_.new_segment);
if (!status.ok())
return status;
STATUS_CHECK(op->Push());
STATUS_CHECK(op->GetResource(context_.new_segment));
created = context_.new_segment;
AddStepWithLsn(*created, context_.lsn);
return status;
return Status::OK();
}
Status
MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
// PXU TODO: Check element type and segment file mapping rules
SegmentPtr new_segment;
auto status = CommitNewSegment(new_segment);
if (!status.ok())
return status;
auto c = context;
c.segment_id = new_segment->GetID();
c.partition_id = new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(c, GetStartedSS());
status = new_sf_op->Push();
if (!status.ok())
return status;
status = new_sf_op->GetResource(created);
if (!status.ok())
return status;
STATUS_CHECK(CommitNewSegment(new_segment));
auto ctx = context;
ctx.segment_id = new_segment->GetID();
ctx.partition_id = new_segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
context_.new_segment_files.push_back(created);
AddStepWithLsn(*created, context_.lsn);
return status;
return Status::OK();
}
Status
MergeOperation::DoExecute(Store& store) {
// PXU TODO:
// 1. Check all requried field elements have related segment files
// 1. Check all required field elements have related segment files
// 2. Check Stale and others
SegmentCommitOperation op(context_, GetAdjustedSS());
auto status = op(store);
if (!status.ok())
return status;
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
SegmentCommitOperation sc_op(context_, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
/* std::cout << GetRepr() << " POST_SC_MAP=("; */
/* for (auto id : context_.new_segment_commit->GetMappings()) { */
......@@ -264,14 +212,9 @@ MergeOperation::DoExecute(Store& store) {
/* std::cout << ")" << std::endl; */
PartitionCommitOperation pc_op(context_, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
STATUS_CHECK(pc_op(store));
OperationContext cc_context;
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
......@@ -282,15 +225,11 @@ MergeOperation::DoExecute(Store& store) {
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
STATUS_CHECK(cc_op(store));
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
return Status::OK();
}
GetSnapshotIDsOperation::GetSnapshotIDsOperation(ID_TYPE collection_id, bool reversed)
......@@ -345,15 +284,12 @@ DropPartitionOperation::GetRepr() const {
Status
DropPartitionOperation::DoExecute(Store& store) {
Status status;
PartitionPtr p;
auto id = c_context_.id;
if (id == 0) {
status = GetAdjustedSS()->GetPartitionId(c_context_.name, id);
STATUS_CHECK(GetAdjustedSS()->GetPartitionId(c_context_.name, id));
c_context_.id = id;
}
if (!status.ok())
return status;
auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id);
if (!p_c) {
std::stringstream emsg;
......@@ -364,16 +300,11 @@ DropPartitionOperation::DoExecute(Store& store) {
OperationContext op_ctx;
op_ctx.stale_partition_commit = p_c;
auto op = CollectionCommitOperation(op_ctx, GetAdjustedSS());
status = op(store);
if (!status.ok())
return status;
status = op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
auto cc_op = CollectionCommitOperation(op_ctx, GetAdjustedSS());
STATUS_CHECK(cc_op(store));
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
AddStepWithLsn(*context_.new_collection_commit, c_context_.lsn);
return status;
return Status::OK();
}
CreatePartitionOperation::CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
......@@ -382,39 +313,28 @@ CreatePartitionOperation::CreatePartitionOperation(const OperationContext& conte
Status
CreatePartitionOperation::PreCheck() {
Status status = BaseT::PreCheck();
if (!status.ok()) {
return status;
}
STATUS_CHECK(BaseT::PreCheck());
if (!context_.new_partition) {
std::stringstream emsg;
emsg << GetRepr() << ". Partition is missing";
status = Status(SS_INVALID_CONTEX_ERROR, emsg.str());
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
return status;
return Status::OK();
}
Status
CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, PartitionPtr& partition) {
Status status;
auto op = std::make_shared<PartitionOperation>(context, GetStartedSS());
status = op->Push();
if (!status.ok())
return status;
status = op->GetResource(partition);
if (!status.ok())
return status;
STATUS_CHECK(op->Push());
STATUS_CHECK(op->GetResource(partition));
context_.new_partition = partition;
AddStepWithLsn(*partition, context_.lsn);
return status;
return Status::OK();
}
Status
CreatePartitionOperation::DoExecute(Store& store) {
Status status;
status = CheckStale();
if (!status.ok())
return status;
STATUS_CHECK(CheckStale());
auto collection = GetAdjustedSS()->GetCollection();
auto partition = context_.new_partition;
......@@ -423,28 +343,21 @@ CreatePartitionOperation::DoExecute(Store& store) {
OperationContext pc_context;
pc_context.new_partition = partition;
auto pc_op = PartitionCommitOperation(pc_context, GetAdjustedSS());
status = pc_op(store);
if (!status.ok())
return status;
status = pc_op.GetResource(pc);
if (!status.ok())
return status;
STATUS_CHECK(pc_op(store));
STATUS_CHECK(pc_op.GetResource(pc));
AddStepWithLsn(*pc, context_.lsn);
OperationContext cc_context;
cc_context.new_partition_commit = pc;
context_.new_partition_commit = pc;
auto cc_op = CollectionCommitOperation(cc_context, GetAdjustedSS());
status = cc_op(store);
if (!status.ok())
return status;
STATUS_CHECK(cc_op(store));
CollectionCommitPtr cc;
status = cc_op.GetResource(cc);
if (!status.ok())
return status;
STATUS_CHECK(cc_op.GetResource(cc));
AddStepWithLsn(*cc, context_.lsn);
context_.new_collection_commit = cc;
return status;
return Status::OK();
}
CreateCollectionOperation::CreateCollectionOperation(const CreateCollectionContext& context)
......@@ -536,12 +449,8 @@ CreateCollectionOperation::DoExecute(Store& store) {
Status
CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
auto status = DoneRequired();
if (!status.ok())
return status;
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckIDSNotEmpty());
if (!c_context_.collection_commit) {
std::stringstream emsg;
emsg << GetRepr() << ". No snapshot is available";
......@@ -549,7 +458,7 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
}
/* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */
ss = context_.latest_ss;
return status;
return Status::OK();
}
Status
......
......@@ -42,7 +42,7 @@ struct SegmentFileContext {
struct LoadOperationContext {
ID_TYPE id = 0;
State status = INVALID;
State state = INVALID;
std::string name;
};
......
......@@ -42,9 +42,11 @@ class ResourceGCEvent : public Event {
Status
Process() override {
auto& store = Store::GetInstance();
/* mark resource as 'deleted' in meta */
auto sd_op = std::make_shared<SoftDeleteOperation<ResourceT>>(res_->GetID());
STATUS_CHECK(sd_op->Push());
STATUS_CHECK((*sd_op)(store));
/* TODO: physically clean resource */
std::vector<std::string> res_file_list;
......@@ -62,7 +64,7 @@ class ResourceGCEvent : public Event {
/* remove resource from meta */
auto hd_op = std::make_shared<HardDeleteOperation<ResourceT>>(res_->GetID());
STATUS_CHECK(hd_op->Push());
STATUS_CHECK((*hd_op)(store));
return Status::OK();
}
......
......@@ -74,9 +74,7 @@ Operations::GetID() const {
Status
Operations::operator()(Store& store) {
auto status = PreCheck();
if (!status.ok())
return status;
STATUS_CHECK(PreCheck());
return ApplyToStore(store);
}
......@@ -114,9 +112,7 @@ Operations::PreCheck() {
Status
Operations::Push(bool sync) {
auto status = PreCheck();
if (!status.ok())
return status;
STATUS_CHECK(PreCheck());
return OperationExecutor::GetInstance().Submit(shared_from_this(), sync);
}
......@@ -128,66 +124,55 @@ Operations::DoCheckStale(ScopedSnapshotT& latest_snapshot) const {
Status
Operations::CheckStale(const CheckStaleFunc& checker) const {
decltype(prev_ss_) latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, prev_ss_->GetCollection()->GetID());
if (!status.ok())
return status;
STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(latest_ss, prev_ss_->GetCollection()->GetID()));
if (prev_ss_->GetID() != latest_ss->GetID()) {
if (checker) {
status = checker(latest_ss);
STATUS_CHECK(checker(latest_ss));
} else {
status = DoCheckStale(latest_ss);
STATUS_CHECK(DoCheckStale(latest_ss));
}
}
return status;
return Status::OK();
}
Status
Operations::DoneRequired() const {
Status status;
Operations::CheckDone() const {
if (!done_) {
std::stringstream emsg;
emsg << GetRepr() << ". Should be done";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
return Status::OK();
}
Status
Operations::IDSNotEmptyRequried() const {
Status status;
Operations::CheckIDSNotEmpty() const {
if (ids_.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". No rsource is available";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
emsg << GetRepr() << ". No resource available";
return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
return Status::OK();
}
Status
Operations::PrevSnapshotRequried() const {
Status status;
Operations::CheckPrevSnapshot() const {
if (!prev_ss_) {
std::stringstream emsg;
emsg << GetRepr() << ". Prev snapshot is requried";
status = Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
emsg << GetRepr() << ". Previous snapshot required";
return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
}
return status;
return Status::OK();
}
Status
Operations::GetSnapshot(ScopedSnapshotT& ss) const {
auto status = PrevSnapshotRequried();
if (!status.ok())
return status;
status = DoneRequired();
if (!status.ok())
return status;
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
STATUS_CHECK(CheckPrevSnapshot());
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckIDSNotEmpty());
/* status = Snapshots::GetInstance().GetSnapshot(ss, prev_ss_->GetCollectionId(), ids_.back()); */
ss = context_.latest_ss;
return status;
return Status::OK();
}
const Status&
......@@ -219,29 +204,23 @@ Operations::OnSnapshotStale() {
Status
Operations::OnExecute(Store& store) {
auto status = PreExecute(store);
if (!status.ok()) {
return status;
}
status = DoExecute(store);
if (!status.ok()) {
return status;
}
return PostExecute(store);
STATUS_CHECK(PreExecute(store));
STATUS_CHECK(DoExecute(store));
STATUS_CHECK(PostExecute(store));
return Status::OK();
}
Status
Operations::PreExecute(Store& store) {
Status status;
if (GetStartedSS() && type_ == OperationsType::W_Compound) {
Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId());
STATUS_CHECK(Snapshots::GetInstance().GetSnapshot(context_.prev_ss, GetStartedSS()->GetCollectionId()));
if (!context_.prev_ss) {
status = OnSnapshotDropped();
STATUS_CHECK(OnSnapshotDropped());
} else if (prev_ss_->GetID() != context_.prev_ss->GetID()) {
status = OnSnapshotStale();
STATUS_CHECK(OnSnapshotStale());
}
}
return status;
return Status::OK();
}
Status
......
......@@ -152,11 +152,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
FailureString() const;
Status
DoneRequired() const;
CheckDone() const;
Status
IDSNotEmptyRequried() const;
CheckIDSNotEmpty() const;
Status
PrevSnapshotRequried() const;
CheckPrevSnapshot() const;
Status
ApplyRollBack(Store&);
......@@ -212,20 +212,16 @@ class CommitOperation : public Operations {
if (wait) {
WaitToFinish();
}
auto status = DoneRequired();
if (!status.ok())
return status;
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckIDSNotEmpty());
resource_->SetID(ids_[0]);
res = resource_;
return status;
return Status::OK();
}
protected:
Status
ResourceNotNullRequired() const {
CheckResource() const {
Status status;
if (!resource_)
return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource");
......@@ -261,19 +257,15 @@ class LoadOperation : public Operations {
if (wait) {
WaitToFinish();
}
auto status = DoneRequired();
if (!status.ok())
return status;
status = ResourceNotNullRequired();
if (!status.ok())
return status;
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckResource());
res = resource_;
return status;
return Status::OK();
}
protected:
Status
ResourceNotNullRequired() const {
CheckResource() const {
Status status;
if (!resource_)
return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource");
......@@ -298,14 +290,10 @@ class SoftDeleteOperation : public Operations {
if (wait) {
WaitToFinish();
}
auto status = DoneRequired();
if (!status.ok())
return status;
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckIDSNotEmpty());
res = resource_;
return status;
return Status::OK();
}
Status
......
......@@ -17,6 +17,8 @@
#include <mutex>
#include <string>
#include <thread>
#include "db/snapshot/Event.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/ScopedResource.h"
......@@ -140,7 +142,8 @@ class ResourceHolder {
virtual void
OnNoRefCallBack(ResourcePtr resource) {
HardDelete(resource->GetID());
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(resource);
EventExecutor::GetInstance().Submit(evt_ptr);
Release(resource->GetID());
}
......
......@@ -130,14 +130,12 @@ class LoadOperation<Collection> : public Operations {
if (wait) {
WaitToFinish();
}
auto status = DoneRequired();
if (!status.ok())
return status;
STATUS_CHECK(CheckDone());
if (!resource_) {
return Status(SS_NOT_FOUND_ERROR, "No specified resource");
}
res = resource_;
return status;
return Status::OK();
}
protected:
......
......@@ -16,47 +16,47 @@
namespace milvus::engine::snapshot {
Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id,
LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
SchemaIdField(schema_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, LSN_TYPE lsn, State status,
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, LSN_TYPE lsn, State state,
TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name),
CollectionIdField(collection_id),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, ID_TYPE id,
LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
......@@ -70,18 +70,18 @@ PartitionCommit::ToString() const {
for (auto sc_id : GetMappings()) {
ss << sc_id << ", ";
}
ss << ") status=" << GetStatus() << " ";
ss << ") state=" << GetState() << " ";
return ss.str();
}
Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status,
Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State state,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
......@@ -94,19 +94,19 @@ Segment::ToString() const {
ss << "partition_id=" << GetPartitionId() << ", ";
ss << "collection_id=" << GetCollectionId() << ", ";
ss << "num=" << (NUM_TYPE)GetNum() << ", ";
ss << "status=" << GetStatus() << ", ";
ss << "state=" << GetState() << ", ";
return ss.str();
}
SegmentCommit::SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: SchemaIdField(schema_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
......@@ -118,66 +118,66 @@ SegmentCommit::ToString() const {
ss << "id=" << GetID() << ", ";
ss << "partition_id=" << GetPartitionId() << ", ";
ss << "segment_id=" << GetSegmentId() << ", ";
ss << "status=" << GetStatus() << ", ";
ss << "state=" << GetState() << ", ";
return ss.str();
}
SegmentFile::SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
FieldElementIdField(field_element_id),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status,
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State state,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
ID_TYPE id, LSN_TYPE lsn, State state, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
IdField(id),
LsnField(lsn),
StatusField(status),
StateField(state),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
......
......@@ -46,46 +46,46 @@ class MappingsField {
MappingT mappings_;
};
class StatusField {
class StateField {
public:
explicit StatusField(State status = PENDING) : status_(status) {
explicit StateField(State state = PENDING) : state_(state) {
}
State
GetStatus() const {
return status_;
GetState() const {
return state_;
}
bool
IsActive() const {
return status_ == ACTIVE;
return state_ == ACTIVE;
}
bool
IsDeactive() const {
return status_ == DEACTIVE;
return state_ == DEACTIVE;
}
bool
Activate() {
if (IsDeactive())
return false;
status_ = ACTIVE;
state_ = ACTIVE;
return true;
}
void
Deactivate() {
status_ = DEACTIVE;
state_ = DEACTIVE;
}
void
ResetStatus() {
status_ = PENDING;
state_ = PENDING;
}
protected:
State status_;
State state_;
};
class LsnField {
......@@ -297,7 +297,7 @@ class Collection : public BaseResource,
public NameField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -319,7 +319,7 @@ class CollectionCommit : public BaseResource,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -342,7 +342,7 @@ class Partition : public BaseResource,
public CollectionIdField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -364,7 +364,7 @@ class PartitionCommit : public BaseResource,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -392,7 +392,7 @@ class Segment : public BaseResource,
public NumField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -419,7 +419,7 @@ class SegmentCommit : public BaseResource,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -448,7 +448,7 @@ class SegmentFile : public BaseResource,
public FieldElementIdField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -472,7 +472,7 @@ class SchemaCommit : public BaseResource,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -496,7 +496,7 @@ class Field : public BaseResource,
public NumField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -518,7 +518,7 @@ class FieldCommit : public BaseResource,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......@@ -544,7 +544,7 @@ class FieldElement : public BaseResource,
public FtypeField,
public IdField,
public LsnField,
public StatusField,
public StateField,
public CreatedOnField,
public UpdatedOnField {
public:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册