未验证 提交 5727cd13 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Fix bugs and more unit test (#2538)

* (db/snapshot): refactor CompoundBaseOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): some refactor for CompoundOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove one of Operations constructor
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add DropPartition API and related test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bugs for multi threads
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add some print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): some bug fix
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): some bug fix 1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bugs
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add build worker for unittest
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

(db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

(db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

(db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in ReferenceProxy
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 c079804b
......@@ -17,8 +17,9 @@
#include <mutex>
#include <string>
#include <thread>
#include "ResourceTypes.h"
#include "ScopedResource.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Store.h"
namespace milvus {
namespace engine {
......@@ -33,6 +34,9 @@ class ResourceHolder {
using ScopedPtr = std::shared_ptr<ScopedT>;
using IdMapT = std::map<ID_TYPE, ResourcePtr>;
using Ptr = std::shared_ptr<Derived>;
// TODO: Resource should be loaded into holder in OperationExecutor thread
ScopedT
Load(Store& store, ID_TYPE id, bool scoped = true);
ScopedT
GetResource(ID_TYPE id, bool scoped = true);
......@@ -65,9 +69,7 @@ class ResourceHolder {
OnNoRefCallBack(ResourcePtr resource);
virtual ResourcePtr
Load(ID_TYPE id);
virtual ResourcePtr
Load(const std::string& name);
DoLoad(Store& store, ID_TYPE id);
ResourceHolder() = default;
virtual ~ResourceHolder() = default;
......
......@@ -36,11 +36,11 @@ void ResourceHolder<ResourceT, Derived>::Reset() {
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ResourcePtr
ResourceHolder<ResourceT, Derived>::Load(ID_TYPE id) {
ResourceHolder<ResourceT, Derived>::DoLoad(Store& store, ID_TYPE id) {
LoadOperationContext context;
context.id = id;
auto op = std::make_shared<LoadOperation<ResourceT>>(context);
op->Push();
(*op)(store);
typename ResourceT::Ptr c;
auto status = op->GetResource(c);
if (status.ok()) {
......@@ -51,14 +51,27 @@ ResourceHolder<ResourceT, Derived>::Load(ID_TYPE id) {
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ResourcePtr
ResourceHolder<ResourceT, Derived>::Load(const std::string& name) {
return nullptr;
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::Load(Store& store, ID_TYPE id, bool scoped) {
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
auto ret = DoLoad(store, id);
if (!ret) return ScopedT();
return ScopedT(ret, scoped);
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::GetResource(ID_TYPE id, bool scoped) {
// TODO: Temp to use Load here. Will be removed when resource is loaded just post Compound
// Operations.
return Load(Store::GetInstance(), id, scoped);
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
......@@ -66,9 +79,7 @@ ResourceHolder<ResourceT, Derived>::GetResource(ID_TYPE id, bool scoped) {
return ScopedT(cit->second, scoped);
}
}
auto ret = Load(id);
if (!ret) return ScopedT();
return ScopedT(ret, scoped);
return ScopedT();
}
template <typename ResourceT, typename Derived>
......@@ -99,7 +110,8 @@ template <typename ResourceT, typename Derived>
bool
ResourceHolder<ResourceT, Derived>::HardDelete(ID_TYPE id) {
auto op = std::make_shared<HardDeleteOperation<ResourceT>>(id);
op->Push();
// TODO:
(*op)(Store::GetInstance());
return true;
}
......
......@@ -17,32 +17,17 @@ namespace milvus {
namespace engine {
namespace snapshot {
template <typename... Fields>
class DBBaseResource : public ReferenceProxy, public Fields... {
class DBBaseResource : public ReferenceProxy {
public:
DBBaseResource(const Fields&... fields);
virtual std::string
ToString() const;
ToString() const {
return "";
}
virtual ~DBBaseResource() {
}
};
template <typename... Fields>
DBBaseResource<Fields...>::DBBaseResource(const Fields&... fields) : Fields(fields)... {
/* InstallField("id"); */
/* InstallField("status"); */
/* InstallField("created_on"); */
/* std::vector<std::string> attrs = {Fields::ATTR...}; */
}
template <typename... Fields>
std::string
DBBaseResource<Fields...>::ToString() const {
return "";
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -19,39 +19,8 @@ namespace milvus {
namespace engine {
namespace snapshot {
CompoundBaseOperation::CompoundBaseOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss, OperationsType::W_Compound) {
}
CompoundBaseOperation::CompoundBaseOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id, OperationsType::W_Compound) {
}
std::string
CompoundBaseOperation::GetRepr() const {
std::stringstream ss;
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_ << GetID();
}
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
ss << ")>";
return ss.str();
}
Status
CompoundBaseOperation::PreCheck() {
if (GetContextLsn() <= prev_ss_->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
}
BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
BuildOperation::BuildOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
BuildOperation::DoExecute(Store& store) {
......@@ -60,14 +29,14 @@ BuildOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
SegmentCommitOperation op(context_, prev_ss_);
SegmentCommitOperation op(context_, context_.prev_ss);
op(store);
status = op.GetResource(context_.new_segment_commit);
if (!status.ok())
return status;
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
PartitionCommitOperation pc_op(context_, prev_ss_);
PartitionCommitOperation pc_op(context_, context_.prev_ss);
pc_op(store);
OperationContext cc_context;
......@@ -83,7 +52,7 @@ BuildOperation::DoExecute(Store& store) {
return status;
AddStepWithLsn(*context_.new_partition_commit, context_.lsn);
CollectionCommitOperation cc_op(cc_context, prev_ss_);
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
cc_op(store);
status = cc_op.GetResource(context_.new_collection_commit);
if (!status.ok())
......@@ -123,9 +92,6 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
NewSegmentOperation::NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
NewSegmentOperation::NewSegmentOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
NewSegmentOperation::DoExecute(Store& store) {
......@@ -135,7 +101,7 @@ NewSegmentOperation::DoExecute(Store& store) {
/* auto status = PrevSnapshotRequried(); */
/* if (!status.ok()) return status; */
// TODO: Check Context
SegmentCommitOperation op(context_, prev_ss_);
SegmentCommitOperation op(context_, context_.prev_ss);
auto status = op(store);
if (!status.ok())
return status;
......@@ -143,10 +109,15 @@ NewSegmentOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
/* std::cout << GetRepr() << " POST_SC_MAP=("; */
/* for (auto id : context_.new_segment_commit->GetMappings()) { */
/* std::cout << id << ","; */
/* } */
/* std::cout << ")" << std::endl; */
OperationContext cc_context;
PartitionCommitOperation pc_op(context_, prev_ss_);
PartitionCommitOperation pc_op(context_, context_.prev_ss);
status = pc_op(store);
if (!status.ok())
return status;
......@@ -155,8 +126,13 @@ NewSegmentOperation::DoExecute(Store& store) {
return status;
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
/* std::cout << GetRepr() << " POST_PC_MAP=("; */
/* for (auto id : cc_context.new_partition_commit->GetMappings()) { */
/* std::cout << id << ","; */
/* } */
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, prev_ss_);
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
status = cc_op(store);
if (!status.ok())
return status;
......@@ -201,9 +177,6 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
MergeOperation::MergeOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
MergeOperation::CommitNewSegment(SegmentPtr& created) {
......@@ -251,7 +224,7 @@ MergeOperation::DoExecute(Store& store) {
// PXU TODO:
// 1. Check all requried field elements have related segment files
// 2. Check Stale and others
SegmentCommitOperation op(context_, prev_ss_);
SegmentCommitOperation op(context_, context_.prev_ss);
auto status = op(store);
if (!status.ok())
return status;
......@@ -260,8 +233,13 @@ MergeOperation::DoExecute(Store& store) {
if (!status.ok())
return status;
AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
/* std::cout << GetRepr() << " POST_SC_MAP=("; */
/* for (auto id : context_.new_segment_commit->GetMappings()) { */
/* std::cout << id << ","; */
/* } */
/* std::cout << ")" << std::endl; */
PartitionCommitOperation pc_op(context_, prev_ss_);
PartitionCommitOperation pc_op(context_, context_.prev_ss);
status = pc_op(store);
if (!status.ok())
return status;
......@@ -273,7 +251,13 @@ MergeOperation::DoExecute(Store& store) {
AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
context_.new_partition_commit = cc_context.new_partition_commit;
CollectionCommitOperation cc_op(cc_context, prev_ss_);
/* std::cout << GetRepr() << " POST_PC_MAP=("; */
/* for (auto id : cc_context.new_partition_commit->GetMappings()) { */
/* std::cout << id << ","; */
/* } */
/* std::cout << ")" << std::endl; */
CollectionCommitOperation cc_op(cc_context, context_.prev_ss);
status = cc_op(store);
if (!status.ok())
return status;
......@@ -324,7 +308,10 @@ DropPartitionOperation::DropPartitionOperation(const PartitionContext& context,
std::string
DropPartitionOperation::GetRepr() const {
std::stringstream ss;
ss << "<DPO(SS=" << prev_ss_->GetID();
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_->GetID();
}
ss << "," << c_context_.ToString();
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
......@@ -365,10 +352,6 @@ DropPartitionOperation::DoExecute(Store& store) {
CreatePartitionOperation::CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
CreatePartitionOperation::CreatePartitionOperation(const OperationContext& context, ID_TYPE collection_id,
ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
CreatePartitionOperation::PreCheck() {
......@@ -448,7 +431,10 @@ CreateCollectionOperation::PreCheck() {
std::string
CreateCollectionOperation::GetRepr() const {
std::stringstream ss;
ss << "<CCO(";
ss << "<" << GetName() << "(";
if (prev_ss_) {
ss << "SS=" << prev_ss_->GetID();
}
ss << c_context_.ToString();
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
......@@ -515,6 +501,7 @@ CreateCollectionOperation::DoExecute(Store& store) {
AddStepWithLsn(*collection_commit, c_context_.lsn);
context_.new_collection_commit = collection_commit;
c_context_.collection_commit = collection_commit;
context_.new_collection_commit = collection_commit;
return Status::OK();
}
......@@ -528,12 +515,13 @@ CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
return status;
if (!c_context_.collection_commit)
return Status(SS_CONSTRAINT_CHECK_ERROR, "No Snapshot is available");
status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId());
/* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */
ss = context_.latest_ss;
return status;
}
Status
SoftDeleteCollectionOperation::DoExecute(Store& store) {
DropCollectionOperation::DoExecute(Store& store) {
if (!context_.collection) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid Context");
}
......
......@@ -19,27 +19,48 @@ namespace milvus {
namespace engine {
namespace snapshot {
template <typename DerivedT>
class CompoundBaseOperation : public Operations {
public:
using BaseT = Operations;
CompoundBaseOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
CompoundBaseOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
CompoundBaseOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss, OperationsType::W_Compound) {
}
std::string
GetRepr() const override;
GetRepr() const override {
std::stringstream ss;
ss << "<" << GetName() << "(";
if (context_.prev_ss) {
ss << "SS=" << context_.prev_ss->GetID();
}
ss << "," << context_.ToString();
ss << ",LSN=" << GetContextLsn();
ss << ")>";
return ss.str();
}
Status
PreCheck() override;
PreCheck() override {
if (GetContextLsn() <= prev_ss_->GetMaxLsn()) {
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
}
return Status::OK();
}
std::string
GetName() const override {
return DerivedT::Name;
}
};
class BuildOperation : public CompoundBaseOperation {
class BuildOperation : public CompoundBaseOperation<BuildOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "BO";
using BaseT = CompoundBaseOperation<BuildOperation>;
static constexpr const char* Name = "B";
BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
BuildOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store&) override;
......@@ -47,23 +68,17 @@ class BuildOperation : public CompoundBaseOperation {
Status
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created);
std::string
GetName() const override {
return Name;
}
protected:
Status
CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const;
};
class NewSegmentOperation : public CompoundBaseOperation {
class NewSegmentOperation : public CompoundBaseOperation<NewSegmentOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "NSO";
using BaseT = CompoundBaseOperation<NewSegmentOperation>;
static constexpr const char* Name = "NS";
NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
NewSegmentOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store&) override;
......@@ -73,20 +88,14 @@ class NewSegmentOperation : public CompoundBaseOperation {
Status
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created);
std::string
GetName() const override {
return Name;
}
};
class MergeOperation : public CompoundBaseOperation {
class MergeOperation : public CompoundBaseOperation<MergeOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "MO";
using BaseT = CompoundBaseOperation<MergeOperation>;
static constexpr const char* Name = "M";
MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
MergeOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store&) override;
......@@ -95,17 +104,12 @@ class MergeOperation : public CompoundBaseOperation {
CommitNewSegment(SegmentPtr&);
Status
CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr&);
std::string
GetName() const override {
return Name;
}
};
class CreateCollectionOperation : public CompoundBaseOperation {
class CreateCollectionOperation : public CompoundBaseOperation<CreateCollectionOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "CCO";
using BaseT = CompoundBaseOperation<CreateCollectionOperation>;
static constexpr const char* Name = "CC";
explicit CreateCollectionOperation(const CreateCollectionContext& context);
......@@ -126,22 +130,16 @@ class CreateCollectionOperation : public CompoundBaseOperation {
std::string
GetRepr() const override;
std::string
GetName() const override {
return Name;
}
private:
CreateCollectionContext c_context_;
};
class CreatePartitionOperation : public CompoundBaseOperation {
class CreatePartitionOperation : public CompoundBaseOperation<CreatePartitionOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "CPO";
using BaseT = CompoundBaseOperation<CreatePartitionOperation>;
static constexpr const char* Name = "CP";
CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
CreatePartitionOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
CommitNewPartition(const PartitionContext& context, PartitionPtr& partition);
......@@ -151,17 +149,12 @@ class CreatePartitionOperation : public CompoundBaseOperation {
Status
PreCheck() override;
std::string
GetName() const override {
return Name;
}
};
class DropPartitionOperation : public CompoundBaseOperation {
class DropPartitionOperation : public CompoundBaseOperation<DropPartitionOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "DPO";
using BaseT = CompoundBaseOperation<DropPartitionOperation>;
static constexpr const char* Name = "DP";
DropPartitionOperation(const PartitionContext& context, ScopedSnapshotT prev_ss);
Status
......@@ -175,11 +168,6 @@ class DropPartitionOperation : public CompoundBaseOperation {
std::string
GetRepr() const override;
std::string
GetName() const override {
return Name;
}
protected:
PartitionContext c_context_;
};
......@@ -219,23 +207,18 @@ class GetCollectionIDsOperation : public Operations {
IDS_TYPE ids_;
};
class SoftDeleteCollectionOperation : public CompoundBaseOperation {
class DropCollectionOperation : public CompoundBaseOperation<DropCollectionOperation> {
public:
using BaseT = CompoundBaseOperation;
static constexpr const char* Name = "DCO";
using BaseT = CompoundBaseOperation<DropCollectionOperation>;
static constexpr const char* Name = "DC";
explicit SoftDeleteCollectionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
explicit DropCollectionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
Status
DoExecute(Store& store) override;
std::string
GetName() const override {
return Name;
}
private:
ID_TYPE collection_id_;
};
......
......@@ -16,6 +16,8 @@ namespace milvus {
namespace engine {
namespace snapshot {
static constexpr const char* INNER_DELIMITER = ":";
std::string
PartitionContext::ToString() const {
std::stringstream ss;
......@@ -59,7 +61,7 @@ OperationContext::ToString() const {
bool first = true;
for (auto& f : stale_segments) {
if (!first) {
ss << ",";
ss << INNER_DELIMITER;
}
ss << f->GetID();
first = false;
......@@ -74,7 +76,7 @@ OperationContext::ToString() const {
bool first = true;
for (auto& f : new_segment_files) {
if (!first) {
ss << ",";
ss << INNER_DELIMITER;
}
ss << f->GetID();
first = false;
......
......@@ -17,6 +17,7 @@
#include <vector>
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/Snapshot.h"
namespace milvus {
namespace engine {
......@@ -45,6 +46,11 @@ struct LoadOperationContext {
};
struct OperationContext {
explicit OperationContext(const ScopedSnapshotT& ss = ScopedSnapshotT()) : prev_ss(ss) {
}
ScopedSnapshotT latest_ss;
ScopedSnapshotT prev_ss;
SegmentPtr new_segment = nullptr;
SegmentCommitPtr new_segment_commit = nullptr;
PartitionPtr new_partition = nullptr;
......
......@@ -35,14 +35,6 @@ Operations::Operations(const OperationContext& context, ScopedSnapshotT prev_ss,
type_(type) {
}
Operations::Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id,
const OperationsType& type)
: context_(context), uid_(UID++), status_(SS_OPERATION_PENDING, "Operation Pending"), type_(type) {
auto status = Snapshots::GetInstance().GetSnapshot(prev_ss_, collection_id, commit_id);
if (!status.ok())
prev_ss_ = ScopedSnapshotT();
}
std::string
Operations::SuccessString() const {
return status_.ToString();
......@@ -101,10 +93,14 @@ Operations::WaitToFinish() {
}
void
Operations::Done() {
Operations::Done(Store& store) {
std::unique_lock<std::mutex> lock(finish_mtx_);
done_ = true;
if (GetType() == OperationsType::W_Compound) {
if (!context_.latest_ss && ids_.size() > 0 && context_.new_collection_commit) {
Snapshots::GetInstance().LoadSnapshot(store, context_.latest_ss,
context_.new_collection_commit->GetCollectionId(), ids_.back());
}
std::cout << ToString() << std::endl;
}
finish_cond_.notify_all();
......@@ -131,7 +127,7 @@ Operations::DoCheckStale(ScopedSnapshotT& latest_snapshot) const {
Status
Operations::CheckStale(const CheckStaleFunc& checker) const {
decltype(prev_ss_) latest_ss;
auto status = Snapshots::GetInstance().GetSnapshotNoLoad(latest_ss, prev_ss_->GetCollection()->GetID());
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, prev_ss_->GetCollection()->GetID());
if (!status.ok())
return status;
if (prev_ss_->GetID() != latest_ss->GetID()) {
......@@ -181,25 +177,38 @@ Operations::GetSnapshot(ScopedSnapshotT& ss) const {
status = IDSNotEmptyRequried();
if (!status.ok())
return status;
status = Snapshots::GetInstance().GetSnapshot(ss, prev_ss_->GetCollectionId(), ids_.back());
/* status = Snapshots::GetInstance().GetSnapshot(ss, prev_ss_->GetCollectionId(), ids_.back()); */
ss = context_.latest_ss;
return status;
}
Status
Operations::ApplyToStore(Store& store) {
if (GetType() == OperationsType::W_Compound) {
std::cout << ToString() << std::endl;
/* std::cout << ToString() << std::endl; */
}
if (done_) {
Done();
Done(store);
return status_;
}
auto status = OnExecute(store);
SetStatus(status);
Done();
Done(store);
return status_;
}
Status
Operations::OnSnapshotDropped() {
return Status::OK();
}
Status
Operations::OnSnapshotStale() {
/* std::cout << GetRepr() << " Stale SS " << prev_ss_->GetID() << " RefCnt=" << prev_ss_->RefCnt() \ */
/* << " Curr SS " << context_.prev_ss->GetID() << " RefCnt=" << context_.prev_ss->RefCnt() << std::endl; */
return Status::OK();
}
Status
Operations::OnExecute(Store& store) {
auto status = PreExecute(store);
......@@ -215,7 +224,16 @@ Operations::OnExecute(Store& store) {
Status
Operations::PreExecute(Store& store) {
return Status::OK();
Status status;
if (prev_ss_ && type_ == OperationsType::W_Compound) {
Snapshots::GetInstance().GetSnapshot(context_.prev_ss, prev_ss_->GetCollectionId());
if (!context_.prev_ss) {
status = OnSnapshotDropped();
} else if (prev_ss_->GetID() != context_.prev_ss->GetID()) {
status = OnSnapshotStale();
}
}
return status;
}
Status
......
......@@ -39,8 +39,6 @@ class Operations : public std::enable_shared_from_this<Operations> {
public:
Operations(const OperationContext& context, ScopedSnapshotT prev_ss,
const OperationsType& type = OperationsType::Invalid);
Operations(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0,
const OperationsType& type = OperationsType::Invalid);
const ScopedSnapshotT&
GetPrevSnapshot() const {
......@@ -108,7 +106,7 @@ class Operations : public std::enable_shared_from_this<Operations> {
WaitToFinish();
void
Done();
Done(Store& store);
void
SetStatus(const Status& status);
......@@ -131,6 +129,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
Status
RollBack();
virtual Status
OnSnapshotStale();
virtual Status
OnSnapshotDropped();
virtual ~Operations();
friend std::ostream&
......@@ -190,9 +193,6 @@ class CommitOperation : public Operations {
CommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss, OperationsType::W_Leaf) {
}
CommitOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0)
: BaseT(context, collection_id, commit_id, OperationsType::W_Leaf) {
}
virtual typename ResourceT::Ptr
GetPrevResource() const {
......@@ -237,12 +237,12 @@ class LoadOperation : public Operations {
Status
ApplyToStore(Store& store) override {
if (done_) {
Done();
Done(store);
return status_;
}
auto status = store.GetResource<ResourceT>(context_.id, resource_);
SetStatus(status);
Done();
Done(store);
return status_;
}
......@@ -287,7 +287,7 @@ class HardDeleteOperation : public Operations {
return status_;
auto status = store.RemoveResource<ResourceT>(id_);
SetStatus(status);
Done();
Done(store);
return status_;
}
......@@ -305,12 +305,12 @@ class HardDeleteOperation<Collection> : public Operations {
Status
ApplyToStore(Store& store) override {
if (done_) {
Done();
Done(store);
return status_;
}
auto status = store.RemoveCollection(id_);
SetStatus(status);
Done();
Done(store);
return status_;
}
......
......@@ -19,17 +19,14 @@ namespace snapshot {
void
ReferenceProxy::Ref() {
refcnt_ += 1;
/* std::cout << this << " refcnt = " << refcnt_ << std::endl; */
++refcnt_;
}
void
ReferenceProxy::UnRef() {
if (refcnt_ == 0)
return;
refcnt_ -= 1;
/* std::cout << this << " refcnt = " << refcnt_ << std::endl; */
if (refcnt_ == 0) {
if (refcnt_.fetch_sub(1) == 1) {
for (auto& cb : on_no_ref_cbs_) {
cb();
}
......
......@@ -11,6 +11,7 @@
#pragma once
#include <any>
#include <atomic>
#include <functional>
#include <memory>
#include <vector>
......@@ -23,6 +24,13 @@ using OnNoRefCBF = std::function<void(void)>;
class ReferenceProxy {
public:
ReferenceProxy() {
}
// TODO: Copy constructor is used in Mock Test. Should never be used. To be removed
ReferenceProxy(const ReferenceProxy& o) {
refcnt_ = 0;
}
void
RegisterOnNoRefCB(OnNoRefCBF cb);
......@@ -44,12 +52,11 @@ class ReferenceProxy {
virtual ~ReferenceProxy();
protected:
int refcnt_ = 0;
std::atomic_long refcnt_ = {0};
std::vector<OnNoRefCBF> on_no_ref_cbs_;
};
using ReferenceResourcePtr = std::shared_ptr<ReferenceProxy>;
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -43,10 +43,6 @@ PartitionOperation::PartitionOperation(const PartitionContext& context, ScopedSn
: BaseT(OperationContext(), prev_ss), context_(context) {
}
PartitionOperation::PartitionOperation(const PartitionContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(OperationContext(), collection_id, commit_id), context_(context) {
}
Status
PartitionOperation::PreCheck() {
return Status::OK();
......@@ -66,11 +62,6 @@ PartitionCommitOperation::PartitionCommitOperation(const OperationContext& conte
: BaseT(context, prev_ss) {
}
PartitionCommitOperation::PartitionCommitOperation(const OperationContext& context, ID_TYPE collection_id,
ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
PartitionCommitOperation::PreCheck() {
return Status::OK();
......@@ -118,11 +109,6 @@ SegmentCommitOperation::SegmentCommitOperation(const OperationContext& context,
: BaseT(context, prev_ss) {
}
SegmentCommitOperation::SegmentCommitOperation(const OperationContext& context, ID_TYPE collection_id,
ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
SegmentCommit::Ptr
SegmentCommitOperation::GetPrevResource() const {
if (context_.new_segment_files.size() > 0) {
......@@ -134,10 +120,6 @@ SegmentCommitOperation::GetPrevResource() const {
SegmentOperation::SegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
SegmentOperation::SegmentOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(context, collection_id, commit_id) {
}
Status
SegmentOperation::PreCheck() {
if (!context_.prev_partition)
......@@ -191,10 +173,6 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS
: BaseT(OperationContext(), prev_ss), context_(sc) {
}
SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ID_TYPE collection_id, ID_TYPE commit_id)
: BaseT(OperationContext(), collection_id, commit_id), context_(sc) {
}
Status
SegmentFileOperation::DoExecute(Store& store) {
auto field_element_id = prev_ss_->GetFieldElementId(context_.field_name, context_.field_element_name);
......
......@@ -22,9 +22,6 @@ class CollectionCommitOperation : public CommitOperation<CollectionCommit> {
using BaseT = CommitOperation<CollectionCommit>;
CollectionCommitOperation(OperationContext context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}
CollectionCommitOperation(OperationContext context, ID_TYPE collection_id, ID_TYPE commit_id = 0)
: BaseT(context, collection_id, commit_id) {
}
CollectionCommitPtr
GetPrevResource() const override {
......@@ -39,7 +36,6 @@ class PartitionCommitOperation : public CommitOperation<PartitionCommit> {
public:
using BaseT = CommitOperation<PartitionCommit>;
PartitionCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
PartitionCommitOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
PartitionCommitPtr
GetPrevResource() const override;
......@@ -55,7 +51,6 @@ class PartitionOperation : public CommitOperation<Partition> {
public:
using BaseT = CommitOperation<Partition>;
PartitionOperation(const PartitionContext& context, ScopedSnapshotT prev_ss);
PartitionOperation(const PartitionContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store& store) override;
......@@ -71,7 +66,6 @@ class SegmentCommitOperation : public CommitOperation<SegmentCommit> {
public:
using BaseT = CommitOperation<SegmentCommit>;
SegmentCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
SegmentCommitOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
SegmentCommit::Ptr
GetPrevResource() const override;
......@@ -87,7 +81,6 @@ class SegmentOperation : public CommitOperation<Segment> {
public:
using BaseT = CommitOperation<Segment>;
SegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
SegmentOperation(const OperationContext& context, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store& store) override;
......@@ -100,7 +93,6 @@ class SegmentFileOperation : public CommitOperation<SegmentFile> {
public:
using BaseT = CommitOperation<SegmentFile>;
SegmentFileOperation(const SegmentFileContext& sc, ScopedSnapshotT prev_ss);
SegmentFileOperation(const SegmentFileContext& sc, ID_TYPE collection_id, ID_TYPE commit_id = 0);
Status
DoExecute(Store& store) override;
......@@ -119,7 +111,7 @@ class LoadOperation<Collection> : public Operations {
Status
ApplyToStore(Store& store) override {
if (done_) {
Done();
Done(store);
return status_;
}
Status status;
......@@ -129,7 +121,7 @@ class LoadOperation<Collection> : public Operations {
status = store.GetResource<Collection>(context_.id, resource_);
}
SetStatus(status);
Done();
Done(store);
return status_;
}
......
......@@ -276,7 +276,7 @@ class NameField {
std::string name_;
};
class Collection : public DBBaseResource<>,
class Collection : public DBBaseResource,
public NameField,
public IdField,
public LsnField,
......@@ -296,7 +296,7 @@ class Collection : public DBBaseResource<>,
using CollectionPtr = Collection::Ptr;
class SchemaCommit : public DBBaseResource<>,
class SchemaCommit : public DBBaseResource,
public CollectionIdField,
public MappingsField,
public IdField,
......@@ -318,7 +318,7 @@ class SchemaCommit : public DBBaseResource<>,
using SchemaCommitPtr = SchemaCommit::Ptr;
class Field : public DBBaseResource<>,
class Field : public DBBaseResource,
public NameField,
public NumField,
public IdField,
......@@ -339,7 +339,7 @@ class Field : public DBBaseResource<>,
using FieldPtr = Field::Ptr;
class FieldCommit : public DBBaseResource<>,
class FieldCommit : public DBBaseResource,
public CollectionIdField,
public FieldIdField,
public MappingsField,
......@@ -362,7 +362,7 @@ class FieldCommit : public DBBaseResource<>,
using FieldCommitPtr = FieldCommit::Ptr;
class FieldElement : public DBBaseResource<>,
class FieldElement : public DBBaseResource,
public CollectionIdField,
public FieldIdField,
public NameField,
......@@ -385,7 +385,7 @@ class FieldElement : public DBBaseResource<>,
using FieldElementPtr = FieldElement::Ptr;
class CollectionCommit : public DBBaseResource<>,
class CollectionCommit : public DBBaseResource,
public CollectionIdField,
public SchemaIdField,
public MappingsField,
......@@ -407,7 +407,7 @@ class CollectionCommit : public DBBaseResource<>,
using CollectionCommitPtr = CollectionCommit::Ptr;
class Partition : public DBBaseResource<>,
class Partition : public DBBaseResource,
public NameField,
public CollectionIdField,
public IdField,
......@@ -428,7 +428,7 @@ class Partition : public DBBaseResource<>,
using PartitionPtr = Partition::Ptr;
class PartitionCommit : public DBBaseResource<>,
class PartitionCommit : public DBBaseResource,
public CollectionIdField,
public PartitionIdField,
public MappingsField,
......@@ -454,7 +454,7 @@ class PartitionCommit : public DBBaseResource<>,
using PartitionCommitPtr = PartitionCommit::Ptr;
class Segment : public DBBaseResource<>,
class Segment : public DBBaseResource,
public PartitionIdField,
public NumField,
public IdField,
......@@ -478,7 +478,7 @@ class Segment : public DBBaseResource<>,
using SegmentPtr = Segment::Ptr;
class SegmentCommit : public DBBaseResource<>,
class SegmentCommit : public DBBaseResource,
public SchemaIdField,
public PartitionIdField,
public SegmentIdField,
......@@ -505,7 +505,7 @@ class SegmentCommit : public DBBaseResource<>,
using SegmentCommitPtr = SegmentCommit::Ptr;
class SegmentFile : public DBBaseResource<>,
class SegmentFile : public DBBaseResource,
public PartitionIdField,
public SegmentIdField,
public FieldElementIdField,
......
......@@ -19,11 +19,13 @@ namespace snapshot {
void
Snapshot::RefAll() {
/* std::cout << this << " RefAll SS=" << GetID() << " SS RefCnt=" << RefCnt() << std::endl; */
std::apply([this](auto&... resource) { ((DoRef(resource)), ...); }, resources_);
}
void
Snapshot::UnRefAll() {
/* std::cout << this << " UnRefAll SS=" << GetID() << " SS RefCnt=" << RefCnt() << std::endl; */
std::apply([this](auto&... resource) { ((DoUnRef(resource)), ...); }, resources_);
}
......@@ -48,6 +50,7 @@ Snapshot::Snapshot(ID_TYPE id) {
auto& segment_commits_holder = SegmentCommitsHolder::GetInstance();
auto& segment_files_holder = SegmentFilesHolder::GetInstance();
auto ssid = id;
for (auto& id : mappings) {
auto partition_commit = partition_commits_holder.GetResource(id, false);
auto partition = partitions_holder.GetResource(partition_commit->GetPartitionId(), false);
......@@ -57,6 +60,11 @@ Snapshot::Snapshot(ID_TYPE id) {
partition_names_map_[partition->GetName()] = partition->GetID();
p_max_seg_num_[partition->GetID()] = 0;
auto& s_c_mappings = partition_commit->GetMappings();
/* std::cout << "SS-" << ssid << "PC_MAP=("; */
/* for (auto id : s_c_mappings) { */
/* std::cout << id << ","; */
/* } */
/* std::cout << ")" << std::endl; */
for (auto& s_c_id : s_c_mappings) {
auto segment_commit = segment_commits_holder.GetResource(s_c_id, false);
auto segment = segments_holder.GetResource(segment_commit->GetSegmentId(), false);
......
......@@ -26,6 +26,7 @@
#include <tuple>
#include <utility>
#include <vector>
#include "db/snapshot/Store.h"
#include "db/snapshot/Utils.h"
#include "db/snapshot/WrappedTypes.h"
#include "utils/Status.h"
......@@ -189,13 +190,13 @@ class Snapshot : public ReferenceProxy {
void
DumpResource(const std::string& tag = "") {
auto& resources = GetResources<ResourceT>();
std::cout << typeid(*this).name() << " Dump" << ResourceT::Name << " Start [" << tag << "]:" << resources.size()
<< std::endl;
std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " Start [" << tag
<< "]:" << resources.size() << std::endl;
for (auto& kv : resources) {
std::cout << "\t" << kv.second->ToString() << std::endl;
}
std::cout << typeid(*this).name() << " Dump" << ResourceT::Name << " End [" << tag << "]:" << resources.size()
<< std::endl;
std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " End [" << tag
<< "]:" << resources.size() << std::endl;
}
template <typename T>
......@@ -246,6 +247,10 @@ class Snapshot : public ReferenceProxy {
}
private:
Snapshot(const Snapshot&) = delete;
Snapshot&
operator=(const Snapshot&) = delete;
// PXU TODO: Re-org below data structures to reduce memory usage
ScopedResourcesT resources_;
ID_TYPE current_schema_id_;
......
......@@ -36,14 +36,11 @@ SnapshotHolder::~SnapshotHolder() {
}
Status
SnapshotHolder::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE id, bool scoped, bool load) {
SnapshotHolder::Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
Status status;
if (id > max_id_) {
if (!load) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
}
CollectionCommitPtr cc;
status = LoadNoLock(id, cc);
status = LoadNoLock(id, cc, store);
if (!status.ok())
return status;
status = Add(id);
......@@ -52,8 +49,31 @@ SnapshotHolder::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE id, bool scoped, bool l
}
std::unique_lock<std::mutex> lock(mutex_);
/* std::cout << "Holder " << collection_id_ << " actives num=" << active_.size() */
/* << " latest=" << active_[max_id_]->GetID() << " RefCnt=" << active_[max_id_]->RefCnt() << std::endl; */
if (id == 0 || id == max_id_) {
auto raw = active_[max_id_];
ss = ScopedSnapshotT(raw, scoped);
return status;
}
if (id < min_id_) {
return Status(SS_STALE_ERROR, "Get stale snapshot");
}
auto it = active_.find(id);
if (it == active_.end()) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
}
ss = ScopedSnapshotT(it->second, scoped);
return status;
}
Status
SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) {
Status status;
if (id > max_id_) {
return Status(SS_NOT_FOUND_ERROR, "Specified Snapshot not found");
}
std::unique_lock<std::mutex> lock(mutex_);
if (id == 0 || id == max_id_) {
auto raw = active_[max_id_];
ss = ScopedSnapshotT(raw, scoped);
......@@ -126,15 +146,25 @@ SnapshotHolder::Add(ID_TYPE id) {
}
Status
SnapshotHolder::LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc) {
SnapshotHolder::LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc, Store& store) {
assert(collection_commit_id > max_id_);
LoadOperationContext context;
context.id = collection_commit_id;
auto op = std::make_shared<LoadOperation<CollectionCommit>>(context);
op->Push();
(*op)(store);
return op->GetResource(cc);
}
/* Status */
/* SnapshotHolder::LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc) { */
/* assert(collection_commit_id > max_id_); */
/* LoadOperationContext context; */
/* context.id = collection_commit_id; */
/* auto op = std::make_shared<LoadOperation<CollectionCommit>>(context); */
/* op->Push(); */
/* return op->GetResource(cc); */
/* } */
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -35,7 +35,9 @@ class SnapshotHolder {
Add(ID_TYPE id);
Status
GetSnapshot(ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true, bool load = true);
Get(ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true);
Status
Load(Store& store, ScopedSnapshotT& ss, ID_TYPE id = 0, bool scoped = true);
Status
SetGCHandler(GCHandler gc_handler) {
......@@ -49,8 +51,10 @@ class SnapshotHolder {
~SnapshotHolder();
private:
/* Status */
/* LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc); */
Status
LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc);
LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc, Store& store);
void
ReadyForRelease(Snapshot::Ptr ss) {
......
......@@ -39,7 +39,7 @@ Snapshots::DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn) {
OperationContext context;
context.lsn = lsn;
context.collection = ss->GetCollection();
auto op = std::make_shared<SoftDeleteCollectionOperation>(context, ss);
auto op = std::make_shared<DropCollectionOperation>(context, ss);
op->Push();
auto status = op->GetStatus();
......@@ -50,12 +50,38 @@ Snapshots::DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn) {
}
Status
Snapshots::GetSnapshotNoLoad(ScopedSnapshotT& ss, ID_TYPE collection_id, bool scoped) {
Snapshots::DropPartition(const ID_TYPE& collection_id, const ID_TYPE& partition_id, const LSN_TYPE& lsn) {
ScopedSnapshotT ss;
auto status = GetSnapshot(ss, collection_id);
if (!status.ok()) {
return status;
}
PartitionContext context;
context.id = partition_id;
context.lsn = lsn;
auto op = std::make_shared<DropPartitionOperation>(context, ss);
status = op->Push();
if (!status.ok()) {
return status;
}
status = op->GetSnapshot(ss);
if (!status.ok()) {
return status;
}
return op->GetStatus();
}
Status
Snapshots::LoadSnapshot(Store& store, ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped) {
SnapshotHolderPtr holder;
auto status = GetHolder(collection_id, holder, false);
auto status = LoadHolder(store, collection_id, holder);
if (!status.ok())
return status;
status = holder->GetSnapshot(ss, 0, scoped, false);
status = holder->Load(store, ss, id, scoped);
return status;
}
......@@ -65,7 +91,7 @@ Snapshots::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, b
auto status = GetHolder(collection_id, holder);
if (!status.ok())
return status;
status = holder->GetSnapshot(ss, id, scoped);
status = holder->Get(ss, id, scoped);
return status;
}
......@@ -75,7 +101,7 @@ Snapshots::GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id,
auto status = GetHolder(name, holder);
if (!status.ok())
return status;
status = holder->GetSnapshot(ss, id, scoped);
status = holder->Get(ss, id, scoped);
return status;
}
......@@ -89,9 +115,10 @@ Snapshots::GetCollectionIds(IDS_TYPE& ids) const {
}
Status
Snapshots::LoadNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) {
Snapshots::LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& holder) {
auto op = std::make_shared<GetSnapshotIDsOperation>(collection_id, false);
op->Push();
/* op->Push(); */
(*op)(store);
auto& collection_commit_ids = op->GetIDs();
if (collection_commit_ids.size() == 0) {
return Status(SS_NOT_FOUND_ERROR, "No collection commit found");
......@@ -110,51 +137,51 @@ Snapshots::Init() {
op->Push();
auto& collection_ids = op->GetIDs();
SnapshotHolderPtr holder;
// TODO
for (auto collection_id : collection_ids) {
GetHolder(collection_id, holder);
/* GetHolder(collection_id, holder); */
auto& store = Store::GetInstance();
LoadHolder(store, collection_id, holder);
}
}
Status
Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) {
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto kv = name_id_map_.find(name);
if (kv != name_id_map_.end()) {
lock.unlock();
return GetHolder(kv->second, holder);
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto kv = name_id_map_.find(name);
if (kv != name_id_map_.end()) {
lock.unlock();
return GetHolder(kv->second, holder);
}
LoadOperationContext context;
context.name = name;
auto op = std::make_shared<LoadOperation<Collection>>(context);
op->Push();
CollectionPtr c;
auto status = op->GetResource(c);
if (!status.ok())
return status;
return GetHolder(c->GetID(), holder);
return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found");
}
Status
Snapshots::GetHolder(ID_TYPE collection_id, SnapshotHolderPtr& holder, bool load) {
Snapshots::GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder) {
Status status;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
status = GetHolderNoLock(collection_id, holder);
return status;
}
Status
Snapshots::LoadHolder(Store& store, const ID_TYPE& collection_id, SnapshotHolderPtr& holder) {
Status status;
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
status = GetHolderNoLock(collection_id, holder);
if (status.ok() && holder)
return status;
if (!load)
return status;
}
status = LoadNoLock(collection_id, holder);
status = LoadNoLock(store, collection_id, holder);
if (!status.ok())
return status;
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
holders_[collection_id] = holder;
ScopedSnapshotT ss;
status = holder->GetSnapshot(ss);
status = holder->Load(store, ss);
if (!status.ok())
return status;
name_id_map_[ss->GetName()] = collection_id;
......@@ -184,8 +211,7 @@ void
Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) {
/* to_release_.push_back(ss_ptr); */
ss_ptr->UnRef();
std::cout << &(*ss_ptr) << " Snapshot " << ss_ptr->GetID() << " RefCnt = " << ss_ptr->RefCnt() << " To be removed"
<< std::endl;
std::cout << "Snapshot " << ss_ptr->GetID() << " RefCnt = " << ss_ptr->RefCnt() << " To be removed" << std::endl;
}
} // namespace snapshot
......
......@@ -20,6 +20,7 @@
#include <thread>
#include <vector>
#include "db/snapshot/SnapshotHolder.h"
#include "db/snapshot/Store.h"
#include "utils/Status.h"
namespace milvus {
......@@ -34,16 +35,18 @@ class Snapshots {
return sss;
}
Status
GetHolder(ID_TYPE collection_id, SnapshotHolderPtr& holder, bool load = true);
GetHolder(const ID_TYPE& collection_id, SnapshotHolderPtr& holder);
Status
GetHolder(const std::string& name, SnapshotHolderPtr& holder);
Status
LoadHolder(Store& store, const ID_TYPE& collection_id, SnapshotHolderPtr& holder);
Status
GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id = 0, bool scoped = true);
Status
GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id = 0, bool scoped = true);
Status
GetSnapshotNoLoad(ScopedSnapshotT& ss, ID_TYPE collection_id, bool scoped = true);
LoadSnapshot(Store& store, ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped = true);
Status
GetCollectionIds(IDS_TYPE& ids) const;
......@@ -53,9 +56,15 @@ class Snapshots {
Status
DropCollection(ID_TYPE collection_id, const LSN_TYPE& lsn);
Status
DropPartition(const ID_TYPE& collection_id, const ID_TYPE& partition_id, const LSN_TYPE& lsn);
Status
Reset();
void
Init();
private:
void
SnapshotGCCallback(Snapshot::Ptr ss_ptr);
......@@ -64,11 +73,9 @@ class Snapshots {
}
Status
DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn);
void
Init();
Status
LoadNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder);
LoadNoLock(Store& store, ID_TYPE collection_id, SnapshotHolderPtr& holder);
Status
GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder);
......
......@@ -24,8 +24,10 @@
#include <iostream>
#include <map>
#include <memory>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <thread>
#include <tuple>
#include <typeindex>
#include <typeinfo>
......@@ -100,29 +102,39 @@ class Store {
template <typename ResourceT>
Status
GetResource(ID_TYPE id, typename ResourceT::Ptr& return_v) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Index<typename ResourceT::MapT, MockResourcesT>::value>(resources_);
auto it = resources.find(id);
if (it == resources.end()) {
/* std::cout << "Can't find " << ResourceT::Name << " " << id << " in ("; */
/* for (auto& i : resources) { */
/* std::cout << i.first << ","; */
/* } */
/* std::cout << ")"; */
return Status(SS_NOT_FOUND_ERROR, "DB resource not found");
}
auto& c = it->second;
return_v = std::make_shared<ResourceT>(*c);
std::cout << "<<< [Load] " << ResourceT::Name << " " << id << " IsActive=" << return_v->IsActive() << std::endl;
/* std::cout << "<<< [Load] " << ResourceT::Name << " " << id
* << " IsActive=" << return_v->IsActive() << std::endl; */
return Status::OK();
}
Status
GetCollection(const std::string& name, CollectionPtr& return_v) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto it = name_ids_.find(name);
if (it == name_ids_.end()) {
return Status(SS_NOT_FOUND_ERROR, "DB resource not found");
}
auto& id = it->second;
lock.unlock();
return GetResource<Collection>(id, return_v);
}
Status
RemoveCollection(ID_TYPE id) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Collection::MapT>(resources_);
auto it = resources.find(id);
if (it == resources.end()) {
......@@ -132,13 +144,14 @@ class Store {
auto name = it->second->GetName();
resources.erase(it);
name_ids_.erase(name);
std::cout << ">>> [Remove] Collection " << id << std::endl;
/* std::cout << ">>> [Remove] Collection " << id << std::endl; */
return Status::OK();
}
template <typename ResourceT>
Status
RemoveResource(ID_TYPE id) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Index<typename ResourceT::MapT, MockResourcesT>::value>(resources_);
auto it = resources.find(id);
if (it == resources.end()) {
......@@ -146,12 +159,13 @@ class Store {
}
resources.erase(it);
std::cout << ">>> [Remove] " << ResourceT::Name << " " << id << std::endl;
/* std::cout << ">>> [Remove] " << ResourceT::Name << " " << id << std::endl; */
return Status::OK();
}
IDS_TYPE
AllActiveCollectionIds(bool reversed = true) const {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
IDS_TYPE ids;
auto& resources = std::get<Collection::MapT>(resources_);
if (!reversed) {
......@@ -168,6 +182,7 @@ class Store {
IDS_TYPE
AllActiveCollectionCommitIds(ID_TYPE collection_id, bool reversed = true) const {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
IDS_TYPE ids;
auto& resources = std::get<CollectionCommit::MapT>(resources_);
if (!reversed) {
......@@ -188,6 +203,7 @@ class Store {
Status
CreateCollection(Collection&& collection, CollectionPtr& return_v) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<Collection::MapT>(resources_);
if (!collection.HasAssigned() && (name_ids_.find(collection.GetName()) != name_ids_.end()) &&
(resources[name_ids_[collection.GetName()]]->IsActive()) && !collection.IsDeactive()) {
......@@ -199,6 +215,7 @@ class Store {
c->ResetCnt();
resources[c->GetID()] = c;
name_ids_[c->GetName()] = c->GetID();
lock.unlock();
GetResource<Collection>(c->GetID(), return_v);
return Status::OK();
}
......@@ -206,11 +223,13 @@ class Store {
template <typename ResourceT>
Status
UpdateResource(ResourceT&& resource, typename ResourceT::Ptr& return_v) {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<typename ResourceT::MapT>(resources_);
auto res = std::make_shared<ResourceT>(resource);
auto& id = std::get<Index<typename ResourceT::MapT, MockResourcesT>::value>(ids_);
res->ResetCnt();
resources[res->GetID()] = res;
lock.unlock();
GetResource<ResourceT>(res->GetID(), return_v);
return Status::OK();
}
......@@ -221,13 +240,16 @@ class Store {
if (resource.HasAssigned()) {
return UpdateResource<ResourceT>(std::move(resource), return_v);
}
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto& resources = std::get<typename ResourceT::MapT>(resources_);
auto res = std::make_shared<ResourceT>(resource);
auto& id = std::get<Index<typename ResourceT::MapT, MockResourcesT>::value>(ids_);
res->SetID(++id);
res->ResetCnt();
resources[res->GetID()] = res;
GetResource<ResourceT>(res->GetID(), return_v);
lock.unlock();
auto status = GetResource<ResourceT>(res->GetID(), return_v);
/* std::cout << ">>> [Create] " << ResourceT::Name << " " << id << std::endl; */
return Status::OK();
}
......@@ -455,6 +477,7 @@ class Store {
MockIDST ids_;
std::map<std::string, ID_TYPE> name_ids_;
std::unordered_map<std::type_index, std::function<ID_TYPE(std::any const&)>> any_flush_vistors_;
mutable std::shared_timed_mutex mutex_;
};
} // namespace snapshot
......
此差异已折叠。
......@@ -372,8 +372,8 @@ SnapshotTest::SetUp() {
milvus::engine::snapshot::SegmentFilesHolder::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
milvus::engine::snapshot::Store::GetInstance().Mock();
milvus::engine::snapshot::Snapshots::GetInstance().Init();
}
void
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册