未验证 提交 295105ff 编写于 作者: X XuPeng-SH 提交者: GitHub

Snapshot code update (#2497)

* (db/snapshot): Add LSN in Resource
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add AddStepWithLsn
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add Lsn related logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add operator << for Operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint errors
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add more test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): operation rollback api placeholder
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove hard code status code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): support more printable dump for some operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add ToString for Context
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): more friendly context print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): rename some apis
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 530ee76a
......@@ -19,35 +19,40 @@ namespace milvus {
namespace engine {
namespace snapshot {
BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
CompoundBaseOperation::CompoundBaseOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss, OperationsType::W_Compound) {
}
BuildOperation::BuildOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
CompoundBaseOperation::CompoundBaseOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id, OperationsType::W_Compound) {
}
std::string
BuildOperation::OperationRepr() const {
CompoundBaseOperation::GetRepr() const {
std::stringstream ss;
ss << "<BO(SS=" << prev_ss_->GetID() << ",SEG=";
if (context_.new_segment_files.size() == 0) {
ss << "?";
} else {
ss << context_.new_segment_files[0]->GetSegmentId();
ss << ",NSF=[";
bool first = true;
for (auto& f : context_.new_segment_files) {
if (!first) {
ss << ",";
}
ss << f->GetID();
first = false;
}
ss << "]";
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_ << GetID();
}
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
ss << ")>";
return ss.str();
}
Status
CompoundBaseOperation::PreCheck() {
if (GetContextLsn() <= prev_ss_->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
}
BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
BuildOperation::BuildOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
BuildOperation::DoExecute(Store& store) {
auto status = CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1,
......@@ -60,30 +65,30 @@ BuildOperation::DoExecute(Store& store) {
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
AddStep(*context_.new_segment_commit);
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
PartitionCommitOperation pc_op(context_, prev_ss_);
pc_op(store);
OperationContext cc_context;
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
AddStep(*cc_context.new_partition_commit);
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
PartitionCommitPtr pc;
status = pc_op.GetResource(pc);
status = pc_op.GetResource(context_.new_partition_commit);
if (!status.ok())
return status;
AddStep(*pc);
AddStepWithLsn(*context_.new_partition_commit, context_.lsn);
CollectionCommitOperation cc_op(cc_context, prev_ss_);
cc_op(store);
CollectionCommitPtr cc;
status = cc_op.GetResource(cc);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
AddStep(*cc);
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
}
......@@ -111,7 +116,7 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
if (!status.ok())
return status;
context_.new_segment_files.push_back(created);
AddStep(*created);
AddStepWithLsn(*created, context_.lsn);
return status;
}
......@@ -137,7 +142,7 @@ NewSegmentOperation::DoExecute(Store& store) {
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
AddStep(*context_.new_segment_commit);
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
OperationContext cc_context;
......@@ -148,17 +153,17 @@ NewSegmentOperation::DoExecute(Store& store) {
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
AddStep(*cc_context.new_partition_commit);
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
CollectionCommitOperation cc_op(cc_context, prev_ss_);
status = cc_op(store);
if (!status.ok())
return status;
CollectionCommitPtr cc;
status = cc_op.GetResource(cc);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
AddStep(*cc);
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
}
......@@ -173,7 +178,7 @@ NewSegmentOperation::CommitNewSegment(SegmentPtr& created) {
if (!status.ok())
return status;
created = context_.new_segment;
AddStep(*created);
AddStepWithLsn(*created, context_.lsn);
return status;
}
......@@ -189,7 +194,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
status = new_sf_op->GetResource(created);
if (!status.ok())
return status;
AddStep(*created);
AddStepWithLsn(*created, context_.lsn);
context_.new_segment_files.push_back(created);
return status;
}
......@@ -200,44 +205,6 @@ MergeOperation::MergeOperation(const OperationContext& context, ID_TYPE collecti
: BaseT(context, collection_id, commit_id) {
}
std::string
MergeOperation::OperationRepr() const {
std::stringstream ss;
ss << "<MO(SS=" << prev_ss_->GetID() << ",SSEG=[";
{
bool first = true;
for (auto& r : context_.stale_segments) {
if (!first) {
ss << ",";
}
ss << r->GetID();
first = false;
}
}
ss << "]";
ss << ",NSEG=";
if (context_.new_segment) {
ss << context_.new_segment->GetID();
} else {
ss << "?";
}
ss << ",NSF=[";
{
bool first = true;
for (auto& f : context_.new_segment_files) {
if (!first) {
ss << ",";
}
ss << f->GetID();
first = false;
}
}
ss << "]";
ss << ")>";
return ss.str();
}
Status
MergeOperation::CommitNewSegment(SegmentPtr& created) {
Status status;
......@@ -253,7 +220,7 @@ MergeOperation::CommitNewSegment(SegmentPtr& created) {
if (!status.ok())
return status;
created = context_.new_segment;
AddStep(*created);
AddStepWithLsn(*created, context_.lsn);
return status;
}
......@@ -275,7 +242,7 @@ MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
if (!status.ok())
return status;
context_.new_segment_files.push_back(created);
AddStep(*created);
AddStepWithLsn(*created, context_.lsn);
return status;
}
......@@ -292,7 +259,7 @@ MergeOperation::DoExecute(Store& store) {
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
AddStep(*context_.new_segment_commit);
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
PartitionCommitOperation pc_op(context_, prev_ss_);
status = pc_op(store);
......@@ -303,23 +270,25 @@ MergeOperation::DoExecute(Store& store) {
status = pc_op.GetResource(cc_context.new_partition_commit);
if (!status.ok())
return status;
AddStep(*cc_context.new_partition_commit);
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
CollectionCommitOperation cc_op(cc_context, prev_ss_);
status = cc_op(store);
if (!status.ok())
return status;
CollectionCommitPtr cc;
status = cc_op.GetResource(cc);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
AddStep(*cc);
AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
return status;
}
GetSnapshotIDsOperation::GetSnapshotIDsOperation(ID_TYPE collection_id, bool reversed)
: BaseT(OperationContext(), ScopedSnapshotT()), collection_id_(collection_id), reversed_(reversed) {
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound),
collection_id_(collection_id),
reversed_(reversed) {
}
Status
......@@ -349,23 +318,35 @@ GetCollectionIDsOperation::GetIDs() const {
}
DropPartitionOperation::DropPartitionOperation(const PartitionContext& context, ScopedSnapshotT prev_ss)
: BaseT(OperationContext(), prev_ss), context_(context) {
: BaseT(OperationContext(), prev_ss), c_context_(context) {
}
std::string
DropPartitionOperation::GetRepr() const {
std::stringstream ss;
ss << "<DPO(SS=" << prev_ss_->GetID();
ss << "," << c_context_.ToString();
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
ss << ")>";
return ss.str();
}
Status
DropPartitionOperation::DoExecute(Store& store) {
Status status;
PartitionPtr p;
auto id = context_.id;
auto id = c_context_.id;
if (id == 0) {
status = prev_ss_->GetPartitionId(context_.name, id);
context_.id = id;
status = prev_ss_->GetPartitionId(c_context_.name, id);
c_context_.id = id;
}
if (!status.ok())
return status;
auto p_c = prev_ss_->GetPartitionCommitByPartitionId(id);
if (!p_c)
return Status(SS_NOT_FOUND_ERROR, "No partition commit found");
context_.stale_partition_commit = p_c;
OperationContext op_ctx;
op_ctx.stale_partition_commit = p_c;
......@@ -373,12 +354,11 @@ DropPartitionOperation::DoExecute(Store& store) {
status = op(store);
if (!status.ok())
return status;
CollectionCommitPtr cc;
status = op.GetResource(cc);
status = op.GetResource(context_.new_collection_commit);
if (!status.ok())
return status;
AddStep(*cc);
AddStepWithLsn(*context_.new_collection_commit, c_context_.lsn);
return status;
}
......@@ -392,7 +372,10 @@ CreatePartitionOperation::CreatePartitionOperation(const OperationContext& conte
Status
CreatePartitionOperation::PreCheck() {
Status status;
Status status = BaseT::PreCheck();
if (!status.ok()) {
return status;
}
if (!context_.new_partition) {
status = Status(SS_INVALID_CONTEX_ERROR, "No partition specified before push partition");
}
......@@ -410,7 +393,7 @@ CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, Pa
if (!status.ok())
return status;
context_.new_partition = partition;
AddStep(*partition);
AddStepWithLsn(*partition, context_.lsn);
return status;
}
......@@ -434,10 +417,10 @@ CreatePartitionOperation::DoExecute(Store& store) {
status = pc_op.GetResource(pc);
if (!status.ok())
return status;
/* status = store.CreateResource<PartitionCommit>(PartitionCommit(collection->GetID(), partition->GetID()), pc); */
AddStep(*pc);
AddStepWithLsn(*pc, context_.lsn);
OperationContext cc_context;
cc_context.new_partition_commit = pc;
context_.new_partition_commit = pc;
auto cc_op = CollectionCommitOperation(cc_context, prev_ss_);
status = cc_op(store);
if (!status.ok())
......@@ -446,39 +429,58 @@ CreatePartitionOperation::DoExecute(Store& store) {
status = cc_op.GetResource(cc);
if (!status.ok())
return status;
AddStep(*cc);
AddStepWithLsn(*cc, context_.lsn);
context_.new_collection_commit = cc;
return status;
}
CreateCollectionOperation::CreateCollectionOperation(const CreateCollectionContext& context)
: BaseT(OperationContext(), ScopedSnapshotT()), context_(context) {
: BaseT(OperationContext(), ScopedSnapshotT()), c_context_(context) {
}
Status
CreateCollectionOperation::PreCheck() {
// TODO
return Status::OK();
}
std::string
CreateCollectionOperation::GetRepr() const {
std::stringstream ss;
ss << "<CCO(";
ss << c_context_.ToString();
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
ss << ")>";
return ss.str();
}
Status
CreateCollectionOperation::DoExecute(Store& store) {
// TODO: Do some checks
CollectionPtr collection;
auto status = store.CreateCollection(Collection(context_.collection->GetName()), collection);
auto status = store.CreateCollection(Collection(c_context_.collection->GetName()), collection);
if (!status.ok()) {
std::cerr << status.ToString() << std::endl;
return status;
}
AddStep(*collection);
AddStepWithLsn(*collection, c_context_.lsn);
context_.new_collection = collection;
MappingT field_commit_ids = {};
auto field_idx = 0;
for (auto& field_kv : context_.fields_schema) {
for (auto& field_kv : c_context_.fields_schema) {
field_idx++;
auto& field_schema = field_kv.first;
auto& field_elements = field_kv.second;
FieldPtr field;
status = store.CreateResource<Field>(Field(field_schema->GetName(), field_idx), field);
AddStep(*field);
AddStepWithLsn(*field, c_context_.lsn);
MappingT element_ids = {};
FieldElementPtr raw_element;
status = store.CreateResource<FieldElement>(
FieldElement(collection->GetID(), field->GetID(), "RAW", FieldElementType::RAW), raw_element);
AddStep(*raw_element);
AddStepWithLsn(*raw_element, c_context_.lsn);
element_ids.insert(raw_element->GetID());
for (auto& element_schema : field_elements) {
FieldElementPtr element;
......@@ -486,30 +488,33 @@ CreateCollectionOperation::DoExecute(Store& store) {
store.CreateResource<FieldElement>(FieldElement(collection->GetID(), field->GetID(),
element_schema->GetName(), element_schema->GetFtype()),
element);
AddStep(*element);
AddStepWithLsn(*element, c_context_.lsn);
element_ids.insert(element->GetID());
}
FieldCommitPtr field_commit;
status = store.CreateResource<FieldCommit>(FieldCommit(collection->GetID(), field->GetID(), element_ids),
field_commit);
AddStep(*field_commit);
AddStepWithLsn(*field_commit, c_context_.lsn);
field_commit_ids.insert(field_commit->GetID());
}
SchemaCommitPtr schema_commit;
status = store.CreateResource<SchemaCommit>(SchemaCommit(collection->GetID(), field_commit_ids), schema_commit);
AddStep(*schema_commit);
AddStepWithLsn(*schema_commit, c_context_.lsn);
PartitionPtr partition;
status = store.CreateResource<Partition>(Partition("_default", collection->GetID()), partition);
AddStep(*partition);
AddStepWithLsn(*partition, c_context_.lsn);
context_.new_partition = partition;
PartitionCommitPtr partition_commit;
status = store.CreateResource<PartitionCommit>(PartitionCommit(collection->GetID(), partition->GetID()),
partition_commit);
AddStep(*partition_commit);
AddStepWithLsn(*partition_commit, c_context_.lsn);
context_.new_partition_commit = partition_commit;
CollectionCommitPtr collection_commit;
status = store.CreateResource<CollectionCommit>(
CollectionCommit(collection->GetID(), schema_commit->GetID(), {partition_commit->GetID()}), collection_commit);
AddStep(*collection_commit);
context_.collection_commit = collection_commit;
AddStepWithLsn(*collection_commit, c_context_.lsn);
context_.new_collection_commit = collection_commit;
c_context_.collection_commit = collection_commit;
return Status::OK();
}
......@@ -521,9 +526,9 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
if (!context_.collection_commit)
if (!c_context_.collection_commit)
return Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available");
status = Snapshots::GetInstance().GetSnapshot(ss, context_.collection_commit->GetCollectionId());
status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId());
return status;
}
......@@ -533,7 +538,7 @@ SoftDeleteCollectionOperation::DoExecute(Store& store) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid Context");
}
context_.collection->Deactivate();
AddStep(*context_.collection);
AddStepWithLsn(*context_.collection, context_.lsn);
return Status::OK();
}
......
......@@ -19,10 +19,25 @@ namespace milvus {
namespace engine {
namespace snapshot {
class BuildOperation : public Operations {
class CompoundBaseOperation : public Operations {
public:
using BaseT = Operations;
CompoundBaseOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
CompoundBaseOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
std::string
GetRepr() const override;
Status
PreCheck() override;
};
class BuildOperation : public CompoundBaseOperation {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "BO";
BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
BuildOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
......@@ -33,16 +48,19 @@ class BuildOperation : public Operations {
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created);
std::string
OperationRepr() const override;
GetName() const override {
return Name;
}
protected:
Status
CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const;
};
class NewSegmentOperation : public Operations {
class NewSegmentOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "NSO";
NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
NewSegmentOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
......@@ -55,11 +73,17 @@ class NewSegmentOperation : public Operations {
Status
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created);
std::string
GetName() const override {
return Name;
}
};
class MergeOperation : public Operations {
class MergeOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "MO";
MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
MergeOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
......@@ -73,12 +97,16 @@ class MergeOperation : public Operations {
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr&);
std::string
OperationRepr() const override;
GetName() const override {
return Name;
}
};
class CreateCollectionOperation : public Operations {
class CreateCollectionOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "CCO";
explicit CreateCollectionOperation(const CreateCollectionContext& context);
Status
......@@ -87,13 +115,31 @@ class CreateCollectionOperation : public Operations {
Status
GetSnapshot(ScopedSnapshotT& ss) const override;
Status
PreCheck() override;
const LSN_TYPE&
GetContextLsn() const override {
return c_context_.lsn;
}
std::string
GetRepr() const override;
std::string
GetName() const override {
return Name;
}
private:
CreateCollectionContext context_;
CreateCollectionContext c_context_;
};
class CreatePartitionOperation : public Operations {
class CreatePartitionOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "CPO";
CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
CreatePartitionOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
......@@ -105,18 +151,37 @@ class CreatePartitionOperation : public Operations {
Status
PreCheck() override;
std::string
GetName() const override {
return Name;
}
};
class DropPartitionOperation : public Operations {
class DropPartitionOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "DPO";
DropPartitionOperation(const PartitionContext& context, ScopedSnapshotT prev_ss);
Status
DoExecute(Store&) override;
const LSN_TYPE&
GetContextLsn() const override {
return c_context_.lsn;
}
std::string
GetRepr() const override;
std::string
GetName() const override {
return Name;
}
protected:
PartitionContext context_;
PartitionContext c_context_;
};
class GetSnapshotIDsOperation : public Operations {
......@@ -154,15 +219,23 @@ class GetCollectionIDsOperation : public Operations {
IDS_TYPE ids_;
};
class SoftDeleteCollectionOperation : public Operations {
class SoftDeleteCollectionOperation : public CompoundBaseOperation {
public:
using BaseT = Operations;
explicit SoftDeleteCollectionOperation(const OperationContext& context) : BaseT(context, ScopedSnapshotT()) {
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "DCO";
explicit SoftDeleteCollectionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
Status
DoExecute(Store& store) override;
std::string
GetName() const override {
return Name;
}
private:
ID_TYPE collection_id_;
};
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/Context.h"
#include <sstream>
namespace milvus {
namespace engine {
namespace snapshot {
std::string
PartitionContext::ToString() const {
std::stringstream ss;
if (id != 0) {
ss << "PID=" << id;
} else if (name != "") {
ss << "PNAME=\"" << name << "\"";
}
return ss.str();
}
std::string
OperationContext::ToString() const {
std::stringstream ss;
if (new_collection_commit) {
ss << "N_CC=" << new_collection_commit->GetID();
}
if (new_collection) {
ss << ",N_CID=" << new_collection->GetID();
ss << ",N_CNAME=\"" << new_collection->GetName() << "\"";
}
if (stale_partition_commit) {
ss << ",S_PC=" << stale_partition_commit->GetID();
}
if (new_partition_commit) {
ss << ",N_PC=" << new_partition_commit->GetID();
}
if (new_partition) {
ss << ",N_PID=" << new_partition->GetID();
ss << ",N_PNAME=\"" << new_partition->GetName() << "\"";
}
if (new_segment_commit) {
ss << ",N_SC=" << new_segment_commit->GetID();
}
if (new_segment) {
ss << ",N_SE=" << new_segment->GetID();
}
if (stale_segments.size()) {
ss << ",S_SE=[";
bool first = true;
for (auto& f : stale_segments) {
if (!first) {
ss << ",";
}
ss << f->GetID();
first = false;
}
ss << "]";
}
if (stale_segment_file) {
ss << ",S_SF=" << stale_segment_file->GetID();
}
if (new_segment_files.size() > 0) {
ss << ",N_SF=[";
bool first = true;
for (auto& f : new_segment_files) {
if (!first) {
ss << ",";
}
ss << f->GetID();
first = false;
}
ss << "]";
}
return ss.str();
}
std::string
CreateCollectionContext::ToString() const {
std::stringstream ss;
if (collection) {
ss << "CID=" << collection->GetID();
ss << ",CNAME=\"" << collection->GetName() << "\"";
}
return ss.str();
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -25,6 +25,10 @@ namespace snapshot {
struct PartitionContext {
std::string name;
ID_TYPE id = 0;
LSN_TYPE lsn = 0;
std::string
ToString() const;
};
struct SegmentFileContext {
......@@ -46,6 +50,8 @@ struct OperationContext {
PartitionPtr new_partition = nullptr;
PartitionCommitPtr new_partition_commit = nullptr;
SchemaCommitPtr new_schema_commit = nullptr;
CollectionCommitPtr new_collection_commit = nullptr;
CollectionPtr new_collection = nullptr;
SegmentFilePtr stale_segment_file = nullptr;
std::vector<SegmentPtr> stale_segments;
......@@ -62,12 +68,20 @@ struct OperationContext {
SegmentFile::VecT new_segment_files;
CollectionPtr collection = nullptr;
LSN_TYPE lsn = 0;
std::string
ToString() const;
};
struct CreateCollectionContext {
CollectionPtr collection = nullptr;
std::map<FieldPtr, std::vector<FieldElementPtr>> fields_schema;
CollectionCommitPtr collection_commit = nullptr;
LSN_TYPE lsn = 0;
std::string
ToString() const;
};
} // namespace snapshot
......
......@@ -21,12 +21,23 @@ namespace snapshot {
static ID_TYPE UID = 1;
Operations::Operations(const OperationContext& context, ScopedSnapshotT prev_ss)
: context_(context), prev_ss_(prev_ss), uid_(UID++), status_(40005, "Operation Pending") {
std::ostream&
operator<<(std::ostream& out, const Operations& operation) {
out << operation.ToString();
return out;
}
Operations::Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: context_(context), uid_(UID++), status_(40005, "Operation Pending") {
Operations::Operations(const OperationContext& context, ScopedSnapshotT prev_ss, const OperationsType& type)
: context_(context),
prev_ss_(prev_ss),
uid_(UID++),
status_(SS_OPERATION_PENDING, "Operation Pending"),
type_(type) {
}
Operations::Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id,
const OperationsType& type)
: context_(context), uid_(UID++), status_(SS_OPERATION_PENDING, "Operation Pending"), type_(type) {
auto status = Snapshots::GetInstance().GetSnapshot(prev_ss_, collection_id, commit_id);
if (!status.ok())
prev_ss_ = ScopedSnapshotT();
......@@ -43,16 +54,16 @@ Operations::FailureString() const {
}
std::string
Operations::OperationRepr() const {
Operations::GetRepr() const {
std::stringstream ss;
ss << "<" << OperationName() << ":" << GetID() << ">";
ss << "<" << GetName() << ":" << GetID() << ">";
return ss.str();
}
std::string
Operations::ToString() const {
std::stringstream ss;
ss << OperationRepr();
ss << GetRepr();
ss << (done_ ? " | DONE" : " | PENDING");
if (done_) {
if (status_.ok()) {
......@@ -71,6 +82,9 @@ Operations::GetID() const {
Status
Operations::operator()(Store& store) {
auto status = PreCheck();
if (!status.ok())
return status;
return ApplyToStore(store);
}
......@@ -90,7 +104,9 @@ void
Operations::Done() {
std::unique_lock<std::mutex> lock(finish_mtx_);
done_ = true;
std::cout << ToString() << std::endl;
if (GetType() == OperationsType::W_Compound) {
std::cout << ToString() << std::endl;
}
finish_cond_.notify_all();
}
......@@ -171,7 +187,9 @@ Operations::GetSnapshot(ScopedSnapshotT& ss) const {
Status
Operations::ApplyToStore(Store& store) {
std::cout << ToString() << std::endl;
if (GetType() == OperationsType::W_Compound) {
std::cout << ToString() << std::endl;
}
if (done_) {
Done();
return status_;
......@@ -210,6 +228,23 @@ Operations::PostExecute(Store& store) {
return store.DoCommitOperation(*this);
}
Status
Operations::RollBack() {
// TODO: Implement here
// Spwarn a rollback operation or re-use this operation
return Status::OK();
}
Status
Operations::ApplyRollBack(Store& store) {
// TODO: Implement rollback to remove all resources in steps_
return Status::OK();
}
Operations::~Operations() {
// TODO: Prefer to submit a rollback operation if status is not ok
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -33,16 +33,25 @@ namespace snapshot {
using StepsT = std::vector<std::any>;
using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };
class Operations : public std::enable_shared_from_this<Operations> {
public:
Operations(const OperationContext& context, ScopedSnapshotT prev_ss);
Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Operations(const OperationContext& context, ScopedSnapshotT prev_ss,
const OperationsType& type = OperationsType::Invalid);
Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0,
const OperationsType& type = OperationsType::Invalid);
const ScopedSnapshotT&
GetPrevSnapshot() const {
return prev_ss_;
}
virtual const LSN_TYPE&
GetContextLsn() const {
return context_.lsn;
}
virtual Status
CheckStale(const CheckStaleFunc& checker = nullptr) const;
virtual Status
......@@ -51,6 +60,9 @@ class Operations : public std::enable_shared_from_this<Operations> {
template <typename StepT>
void
AddStep(const StepT& step, bool activate = true);
template <typename StepT>
void
AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate = true);
void
SetStepResult(ID_TYPE id) {
ids_.push_back(id);
......@@ -64,6 +76,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
ID_TYPE
GetID() const;
virtual const OperationsType&
GetType() const {
return type_;
}
virtual Status
OnExecute(Store&);
virtual Status
......@@ -101,18 +118,23 @@ class Operations : public std::enable_shared_from_this<Operations> {
return status_;
}
std::string
OperationName() const {
virtual std::string
GetName() const {
return typeid(*this).name();
}
virtual std::string
OperationRepr() const;
GetRepr() const;
virtual std::string
ToString() const;
virtual ~Operations() {
}
Status
RollBack();
virtual ~Operations();
friend std::ostream&
operator<<(std::ostream& out, const Operations& operation);
protected:
virtual std::string
......@@ -127,6 +149,9 @@ class Operations : public std::enable_shared_from_this<Operations> {
Status
PrevSnapshotRequried() const;
Status
ApplyRollBack(Store&);
OperationContext context_;
ScopedSnapshotT prev_ss_;
StepsT steps_;
......@@ -136,6 +161,7 @@ class Operations : public std::enable_shared_from_this<Operations> {
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
ID_TYPE uid_;
OperationsType type_;
};
template <typename StepT>
......@@ -147,14 +173,25 @@ Operations::AddStep(const StepT& step, bool activate) {
steps_.push_back(s);
}
template <typename StepT>
void
Operations::AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate) {
auto s = std::make_shared<StepT>(step);
if (activate)
s->Activate();
s->SetLsn(lsn);
steps_.push_back(s);
}
template <typename ResourceT>
class CommitOperation : public Operations {
public:
using BaseT = Operations;
CommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
CommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss, OperationsType::W_Leaf) {
}
CommitOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0)
: BaseT(context, collection_id, commit_id) {
: BaseT(context, collection_id, commit_id, OperationsType::W_Leaf) {
}
virtual typename ResourceT::Ptr
......@@ -194,7 +231,7 @@ template <typename ResourceT>
class LoadOperation : public Operations {
public:
explicit LoadOperation(const LoadOperationContext& context)
: Operations(OperationContext(), ScopedSnapshotT()), context_(context) {
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf), context_(context) {
}
Status
......@@ -240,7 +277,8 @@ class LoadOperation : public Operations {
template <typename ResourceT>
class HardDeleteOperation : public Operations {
public:
explicit HardDeleteOperation(ID_TYPE id) : Operations(OperationContext(), ScopedSnapshotT()), id_(id) {
explicit HardDeleteOperation(ID_TYPE id)
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
}
Status
......@@ -260,7 +298,8 @@ class HardDeleteOperation : public Operations {
template <>
class HardDeleteOperation<Collection> : public Operations {
public:
explicit HardDeleteOperation(ID_TYPE id) : Operations(OperationContext(), ScopedSnapshotT()), id_(id) {
explicit HardDeleteOperation(ID_TYPE id)
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
}
Status
......
......@@ -113,7 +113,7 @@ template <>
class LoadOperation<Collection> : public Operations {
public:
explicit LoadOperation(const LoadOperationContext& context)
: Operations(OperationContext(), ScopedSnapshotT()), context_(context) {
: Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf), context_(context) {
}
Status
......
......@@ -23,6 +23,7 @@ using ID_TYPE = int64_t;
using NUM_TYPE = int64_t;
using FTYPE_TYPE = int64_t;
using TS_TYPE = int64_t;
using LSN_TYPE = uint64_t;
using MappingT = std::set<ID_TYPE>;
enum FieldElementType { RAW, IVFSQ8 };
......
......@@ -18,93 +18,93 @@ namespace milvus {
namespace engine {
namespace snapshot {
Collection::Collection(const std::string& name, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name), IdField(id), StatusField(status), CreatedOnField(created_on), UpdatedOnField(updated_on) {
Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, State status,
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, State status,
TS_TYPE created_on, TS_TYPE updated_on)
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on_) {
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id,
State status, TS_TYPE created_on, TS_TYPE updated_on)
LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
SchemaIdField(schema_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
/* std::string CollectionCommit::ToString() const { */
/* std::stringstream ss; */
/* ss << "<" << BaseT::ToString() << ", Mappings=" << "["; */
/* bool first = true; */
/* std::string prefix; */
/* for (auto& id : mappings_) { */
/* if (!first) prefix = ", "; */
/* else first = false; */
/* ss << prefix << id; */
/* } */
/* ss << "]>"; */
/* return ss.str(); */
/* } */
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, State status, TS_TYPE created_on,
TS_TYPE updated_on)
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, LSN_TYPE lsn, State status,
TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name),
CollectionIdField(collection_id),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, ID_TYPE id,
State status, TS_TYPE created_on, TS_TYPE updated_on)
LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
......@@ -123,10 +123,12 @@ PartitionCommit::ToString() const {
return ss.str();
}
Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: PartitionIdField(partition_id),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
......@@ -144,12 +146,13 @@ Segment::ToString() const {
}
SegmentCommit::SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings,
ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: SchemaIdField(schema_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
......@@ -166,12 +169,13 @@ SegmentCommit::ToString() const {
return ss.str();
}
SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id, State status,
TS_TYPE created_on, TS_TYPE updated_on)
SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
: PartitionIdField(partition_id),
SegmentIdField(segment_id),
FieldElementIdField(field_element_id),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
......
......@@ -86,6 +86,24 @@ class StatusField {
State status_;
};
class LsnField {
public:
explicit LsnField(LSN_TYPE lsn = 0) : lsn_(lsn) {
}
const LSN_TYPE&
GetLsn() const {
return lsn_;
}
void
SetLsn(const LSN_TYPE& lsn) {
lsn_ = lsn;
}
protected:
LSN_TYPE lsn_;
};
class CreatedOnField {
public:
explicit CreatedOnField(TS_TYPE created_on = GetMicroSecTimeStamp()) : created_on_(created_on) {
......@@ -261,6 +279,7 @@ class NameField {
class Collection : public DBBaseResource<>,
public NameField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -271,7 +290,7 @@ class Collection : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Collection";
Collection(const std::string& name, ID_TYPE id = 0, State status = PENDING,
Collection(const std::string& name, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -281,6 +300,7 @@ class SchemaCommit : public DBBaseResource<>,
public CollectionIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -291,8 +311,9 @@ class SchemaCommit : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SchemaCommit";
SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SchemaCommitPtr = SchemaCommit::Ptr;
......@@ -301,6 +322,7 @@ class Field : public DBBaseResource<>,
public NameField,
public NumField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -311,7 +333,7 @@ class Field : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, State status = PENDING,
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -322,6 +344,7 @@ class FieldCommit : public DBBaseResource<>,
public FieldIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -333,7 +356,7 @@ class FieldCommit : public DBBaseResource<>,
static constexpr const char* Name = "FieldCommit";
FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -345,6 +368,7 @@ class FieldElement : public DBBaseResource<>,
public NameField,
public FtypeField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -355,7 +379,7 @@ class FieldElement : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -366,6 +390,7 @@ class CollectionCommit : public DBBaseResource<>,
public SchemaIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -376,7 +401,7 @@ class CollectionCommit : public DBBaseResource<>,
using ScopedMapT = std::map<ID_TYPE, ScopedResource<CollectionCommit>>;
using VecT = std::vector<Ptr>;
CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -386,6 +411,7 @@ class Partition : public DBBaseResource<>,
public NameField,
public CollectionIdField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -396,7 +422,7 @@ class Partition : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Partition";
Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id = 0, State status = PENDING,
Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......@@ -407,6 +433,7 @@ class PartitionCommit : public DBBaseResource<>,
public PartitionIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -418,7 +445,7 @@ class PartitionCommit : public DBBaseResource<>,
static constexpr const char* Name = "PartitionCommit";
PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
......@@ -431,6 +458,7 @@ class Segment : public DBBaseResource<>,
public PartitionIdField,
public NumField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -441,7 +469,7 @@ class Segment : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Segment";
Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, State status = PENDING,
Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
......@@ -456,6 +484,7 @@ class SegmentCommit : public DBBaseResource<>,
public SegmentIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -467,7 +496,7 @@ class SegmentCommit : public DBBaseResource<>,
static constexpr const char* Name = "SegmentCommit";
SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings = {},
ID_TYPE id = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
......@@ -481,6 +510,7 @@ class SegmentFile : public DBBaseResource<>,
public SegmentIdField,
public FieldElementIdField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
......@@ -491,7 +521,7 @@ class SegmentFile : public DBBaseResource<>,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentFile";
SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id = 0,
SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......
......@@ -30,6 +30,7 @@ Snapshot::UnRefAll() {
Snapshot::Snapshot(ID_TYPE id) {
auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(id, false);
AddResource<CollectionCommit>(collection_commit);
max_lsn_ = collection_commit->GetLsn();
auto& schema_holder = SchemaCommitsHolder::GetInstance();
auto current_schema = schema_holder.GetResource(collection_commit->GetSchemaId(), false);
AddResource<SchemaCommit>(current_schema);
......
......@@ -76,6 +76,11 @@ class Snapshot : public ReferenceProxy {
return GetResources<Partition>().size();
}
const LSN_TYPE&
GetMaxLsn() const {
return max_lsn_;
}
Status
GetPartitionId(const std::string& name, ID_TYPE& id) const {
auto it = partition_names_map_.find(name);
......@@ -252,6 +257,7 @@ class Snapshot : public ReferenceProxy {
std::map<ID_TYPE, ID_TYPE> p_pc_map_;
ID_TYPE latest_schema_commit_id_ = 0;
std::map<ID_TYPE, NUM_TYPE> p_max_seg_num_;
LSN_TYPE max_lsn_;
};
using ScopedSnapshotT = ScopedResource<Snapshot>;
......
......@@ -17,28 +17,29 @@ namespace engine {
namespace snapshot {
Status
Snapshots::DropCollection(ID_TYPE collection_id) {
Snapshots::DropCollection(ID_TYPE collection_id, const LSN_TYPE& lsn) {
ScopedSnapshotT ss;
auto status = GetSnapshot(ss, collection_id);
if (!status.ok())
return status;
return DoDropCollection(ss);
return DoDropCollection(ss, lsn);
}
Status
Snapshots::DropCollection(const std::string& name) {
Snapshots::DropCollection(const std::string& name, const LSN_TYPE& lsn) {
ScopedSnapshotT ss;
auto status = GetSnapshot(ss, name);
if (!status.ok())
return status;
return DoDropCollection(ss);
return DoDropCollection(ss, lsn);
}
Status
Snapshots::DoDropCollection(ScopedSnapshotT& ss) {
Snapshots::DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn) {
OperationContext context;
context.lsn = lsn;
context.collection = ss->GetCollection();
auto op = std::make_shared<SoftDeleteCollectionOperation>(context);
auto op = std::make_shared<SoftDeleteCollectionOperation>(context, ss);
op->Push();
auto status = op->GetStatus();
......
......@@ -49,9 +49,9 @@ class Snapshots {
GetCollectionIds(IDS_TYPE& ids) const;
Status
DropCollection(const std::string& name);
DropCollection(const std::string& name, const LSN_TYPE& lsn);
Status
DropCollection(ID_TYPE collection_id);
DropCollection(ID_TYPE collection_id, const LSN_TYPE& lsn);
Status
Reset();
......@@ -63,7 +63,7 @@ class Snapshots {
Init();
}
Status
DoDropCollection(ScopedSnapshotT& ss);
DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn);
void
Init();
......
......@@ -128,6 +128,7 @@ constexpr ErrorCode SS_DUPLICATED_ERROR = ToSSErrorCode(5);
constexpr ErrorCode SS_NOT_ACTIVE_ERROR = ToSSErrorCode(6);
constexpr ErrorCode SS_CONSTRAINT_CHECK_ERROR = ToSSErrorCode(7);
constexpr ErrorCode SS_INVALID_ARGUMENT_ERROR = ToSSErrorCode(8);
constexpr ErrorCode SS_OPERATION_PENDING = ToSSErrorCode(9);
namespace server {
class ServerException : public std::exception {
......
......@@ -131,8 +131,9 @@ TEST_F(SnapshotTest, ResourceHoldersTest) {
}
milvus::engine::snapshot::ScopedSnapshotT
CreateCollection(const std::string& collection_name) {
CreateCollection(const std::string& collection_name, milvus::engine::snapshot::LSN_TYPE lsn) {
milvus::engine::snapshot::CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<milvus::engine::snapshot::Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<milvus::engine::snapshot::Field>("vector", 0);
......@@ -156,7 +157,8 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
ASSERT_TRUE(!expect_null);
std::string collection_name = "test_c1";
auto ss = CreateCollection(collection_name);
milvus::engine::snapshot::LSN_TYPE lsn = 1;
auto ss = CreateCollection(collection_name, lsn);
ASSERT_TRUE(ss);
milvus::engine::snapshot::ScopedSnapshotT latest_ss;
......@@ -174,8 +176,9 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
milvus::engine::snapshot::OperationContext sd_op_ctx;
sd_op_ctx.collection = latest_ss->GetCollection();
sd_op_ctx.lsn = latest_ss->GetMaxLsn() + 1;
ASSERT_TRUE(sd_op_ctx.collection->IsActive());
auto sd_op = std::make_shared<milvus::engine::snapshot::SoftDeleteCollectionOperation>(sd_op_ctx);
auto sd_op = std::make_shared<milvus::engine::snapshot::SoftDeleteCollectionOperation>(sd_op_ctx, latest_ss);
status = sd_op->Push();
ASSERT_TRUE(status.ok());
ASSERT_TRUE(sd_op->GetStatus().ok());
......@@ -188,7 +191,8 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
TEST_F(SnapshotTest, DropCollectionTest) {
milvus::engine::snapshot::Store::GetInstance().DoReset();
std::string collection_name = "test_c1";
auto ss = CreateCollection(collection_name);
milvus::engine::snapshot::LSN_TYPE lsn = 1;
auto ss = CreateCollection(collection_name, lsn);
ASSERT_TRUE(ss);
milvus::engine::snapshot::ScopedSnapshotT lss;
auto status = milvus::engine::snapshot::Snapshots::GetInstance().GetSnapshot(lss, collection_name);
......@@ -197,31 +201,33 @@ TEST_F(SnapshotTest, DropCollectionTest) {
ASSERT_EQ(ss->GetID(), lss->GetID());
auto prev_ss_id = ss->GetID();
auto prev_c_id = ss->GetCollection()->GetID();
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name);
lsn = ss->GetMaxLsn() + 1;
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name, lsn);
ASSERT_TRUE(status.ok());
status = milvus::engine::snapshot::Snapshots::GetInstance().GetSnapshot(lss, collection_name);
ASSERT_TRUE(!status.ok());
auto ss_2 = CreateCollection(collection_name);
auto ss_2 = CreateCollection(collection_name, ++lsn);
status = milvus::engine::snapshot::Snapshots::GetInstance().GetSnapshot(lss, collection_name);
ASSERT_TRUE(status.ok());
ASSERT_EQ(ss_2->GetID(), lss->GetID());
ASSERT_TRUE(prev_ss_id != ss_2->GetID());
ASSERT_TRUE(prev_c_id != ss_2->GetCollection()->GetID());
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name);
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(status.ok());
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name);
status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(!status.ok());
}
TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
milvus::engine::snapshot::Store::GetInstance().DoReset();
std::string collection_name("c1");
milvus::engine::snapshot::LSN_TYPE lsn = 1;
milvus::engine::snapshot::ID_TYPE stale_ss_id;
auto worker1 = [&]() {
milvus::Status status;
auto ss = CreateCollection(collection_name);
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
stale_ss_id = ss->GetID();
......@@ -239,7 +245,7 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
};
auto worker2 = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name);
auto status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(status.ok());
milvus::engine::snapshot::ScopedSnapshotT a_ss;
status = milvus::engine::snapshot::Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
......@@ -247,10 +253,10 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
};
auto worker3 = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto ss = CreateCollection(collection_name);
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(!ss);
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ss = CreateCollection(collection_name);
ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
};
......@@ -268,12 +274,14 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
TEST_F(SnapshotTest, PartitionTest) {
milvus::engine::snapshot::Store::GetInstance().DoReset();
std::string collection_name("c1");
auto ss = CreateCollection(collection_name);
milvus::engine::snapshot::LSN_TYPE lsn = 1;
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
ASSERT_EQ(ss->NumberOfPartitions(), 1);
milvus::engine::snapshot::OperationContext context;
context.lsn = ++lsn;
auto op = std::make_shared<milvus::engine::snapshot::CreatePartitionOperation>(context, ss);
std::string partition_name("p1");
......@@ -297,6 +305,7 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_TRUE(curr_ss->GetID() > ss->GetID());
ASSERT_EQ(curr_ss->NumberOfPartitions(), 2);
p_ctx.lsn = ++lsn;
auto drop_op = std::make_shared<milvus::engine::snapshot::DropPartitionOperation>(p_ctx, curr_ss);
status = drop_op->Push();
ASSERT_TRUE(status.ok());
......@@ -309,15 +318,45 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_TRUE(latest_ss->GetID() > curr_ss->GetID());
ASSERT_EQ(latest_ss->NumberOfPartitions(), 1);
p_ctx.lsn = ++lsn;
drop_op = std::make_shared<milvus::engine::snapshot::DropPartitionOperation>(p_ctx, latest_ss);
status = drop_op->Push();
ASSERT_TRUE(!status.ok());
std::cout << status.ToString() << std::endl;
}
TEST_F(SnapshotTest, PartitionTest2) {
milvus::engine::snapshot::Store::GetInstance().DoReset();
std::string collection_name("c1");
milvus::engine::snapshot::LSN_TYPE lsn = 1;
milvus::Status status;
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(lsn, ss->GetMaxLsn());
milvus::engine::snapshot::OperationContext context;
context.lsn = lsn;
auto cp_op = std::make_shared<milvus::engine::snapshot::CreatePartitionOperation>(context, ss);
std::string partition_name("p1");
milvus::engine::snapshot::PartitionContext p_ctx;
p_ctx.name = partition_name;
milvus::engine::snapshot::PartitionPtr partition;
status = cp_op->CommitNewPartition(p_ctx, partition);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(partition);
ASSERT_EQ(partition->GetName(), partition_name);
ASSERT_TRUE(!partition->IsActive());
ASSERT_TRUE(partition->HasAssigned());
status = cp_op->Push();
ASSERT_TRUE(!status.ok());
}
TEST_F(SnapshotTest, OperationTest) {
milvus::Status status;
std::string to_string;
milvus::engine::snapshot::LSN_TYPE lsn;
milvus::engine::snapshot::SegmentFileContext sf_context;
sf_context.field_name = "f_1_1";
sf_context.field_element_name = "fe_1_1";
......@@ -327,6 +366,7 @@ TEST_F(SnapshotTest, OperationTest) {
milvus::engine::snapshot::ScopedSnapshotT ss;
status = milvus::engine::snapshot::Snapshots::GetInstance().GetSnapshot(ss, 1);
auto ss_id = ss->GetID();
lsn = ss->GetMaxLsn() + 1;
ASSERT_TRUE(status.ok());
// Check snapshot
......@@ -347,6 +387,7 @@ TEST_F(SnapshotTest, OperationTest) {
// Check build operation correctness
{
milvus::engine::snapshot::OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<milvus::engine::snapshot::BuildOperation>(context, ss);
milvus::engine::snapshot::SegmentFilePtr seg_file;
status = build_op->CommitNewSegmentFile(sf_context, seg_file);
......@@ -383,6 +424,7 @@ TEST_F(SnapshotTest, OperationTest) {
milvus::engine::snapshot::ID_TYPE partition_id;
{
milvus::engine::snapshot::OperationContext context;
context.lsn = ++lsn;
context.prev_partition = ss->GetResource<milvus::engine::snapshot::Partition>(1);
auto op = std::make_shared<milvus::engine::snapshot::NewSegmentOperation>(context, ss);
milvus::engine::snapshot::SegmentPtr new_seg;
......@@ -420,6 +462,7 @@ TEST_F(SnapshotTest, OperationTest) {
auto expect_null = ss->GetPartitionCommitByPartitionId(11111111);
ASSERT_TRUE(!expect_null);
ASSERT_NE(prev_partition_commit->ToString(), "");
merge_ctx.lsn = ++lsn;
auto op = std::make_shared<milvus::engine::snapshot::MergeOperation>(merge_ctx, ss);
milvus::engine::snapshot::SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
......@@ -455,6 +498,7 @@ TEST_F(SnapshotTest, OperationTest) {
// 4. Commit new seg file of build operation -> Stale Segment Found Here!
{
milvus::engine::snapshot::OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<milvus::engine::snapshot::BuildOperation>(context, new_ss);
milvus::engine::snapshot::SegmentFilePtr seg_file;
auto new_sf_context = sf_context;
......@@ -469,6 +513,7 @@ TEST_F(SnapshotTest, OperationTest) {
// 4. Commit build operation -> Stale Segment Found Here!
{
milvus::engine::snapshot::OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<milvus::engine::snapshot::BuildOperation>(context, ss);
milvus::engine::snapshot::SegmentFilePtr seg_file;
auto new_sf_context = sf_context;
......@@ -477,7 +522,8 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(status.ok());
std::cout << build_op->ToString() << std::endl;
auto status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(ss->GetName());
auto status = milvus::engine::snapshot::Snapshots::GetInstance().DropCollection(ss->GetName(),
++lsn);
ASSERT_TRUE(status.ok());
status = build_op->Push();
ASSERT_TRUE(!status.ok());
......
......@@ -378,6 +378,8 @@ SnapshotTest::SetUp() {
void
SnapshotTest::TearDown() {
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
BaseTest::TearDown();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册