未验证 提交 5964adef 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Implement operation rollback (#2760)

* (db/snapshot): refactor Operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor Operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add rollback for operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 17a01fe5
......@@ -70,13 +70,17 @@ class EventExecutor {
private:
void
ThreadMain() {
Status status;
while (true) {
EventPtr evt = queue_.Take();
if (evt == nullptr) {
break;
}
/* std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl; */
evt->Process();
status = evt->Process();
if (!status.ok()) {
std::cout << "EventExecutor Handle Event Error: " << status.ToString() << std::endl;
}
}
}
......
......@@ -12,6 +12,8 @@
#include "db/snapshot/Operations.h"
#include <chrono>
#include <sstream>
#include "db/snapshot/Event.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
......@@ -100,6 +102,13 @@ Operations::Done(Store& store) {
Snapshots::GetInstance().LoadSnapshot(store, context_.latest_ss,
context_.new_collection_commit->GetCollectionId(), ids_.back());
}
/* if (!context_.latest_ss && context_.new_collection_commit) { */
/* auto& holder = std::get<ConstPos(last_pos_)>(holders_); */
/* if (holder.size() > 0) */
/* Snapshots::GetInstance().LoadSnapshot(store, context_.latest_ss, */
/* context_.new_collection_commit->GetCollectionId(), holder.rbegin()->GetID()); */
/* } */
/* } */
std::cout << ToString() << std::endl;
}
finish_cond_.notify_all();
......@@ -230,24 +239,28 @@ Operations::DoExecute(Store& store) {
Status
Operations::PostExecute(Store& store) {
return store.DoCommitOperation(*this);
return store.ApplyOperation(*this);
}
Status
Operations::RollBack() {
// TODO: Implement here
// Spwarn a rollback operation or re-use this operation
return Status::OK();
template <typename ResourceT>
void
ApplyRollBack(std::set<std::shared_ptr<ResourceT>>& steps_set) {
for (auto& res : steps_set) {
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(res);
EventExecutor::GetInstance().Submit(evt_ptr);
std::cout << "Rollback " << typeid(ResourceT).name() << ": " << res->GetID() << std::endl;
}
}
Status
Operations::ApplyRollBack(Store& store) {
// TODO: Implement rollback to remove all resources in steps_
return Status::OK();
void
Operations::RollBack() {
std::apply([&](auto&... steps_set) { ((ApplyRollBack(steps_set)), ...); }, GetStepHolders());
}
Operations::~Operations() {
// TODO: Prefer to submit a rollback operation if status is not ok
if (!status_.ok()) {
RollBack();
}
}
} // namespace snapshot
......
......@@ -17,8 +17,10 @@
#include <condition_variable>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <thread>
#include <tuple>
#include <vector>
#include "Context.h"
#include "db/snapshot/Snapshot.h"
......@@ -30,8 +32,10 @@ namespace milvus {
namespace engine {
namespace snapshot {
using StepsT = std::vector<std::any>;
using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
using StepsHolderT = std::tuple<CollectionCommit::SetT, Collection::SetT, SchemaCommit::SetT, FieldCommit::SetT,
Field::SetT, FieldElement::SetT, PartitionCommit::SetT, Partition::SetT,
SegmentCommit::SetT, Segment::SetT, SegmentFile::SetT>;
enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };
......@@ -71,9 +75,14 @@ class Operations : public std::enable_shared_from_this<Operations> {
ids_.push_back(id);
}
StepsT&
GetSteps() {
return steps_;
const size_t
GetPos() const {
return last_pos_;
}
StepsHolderT&
GetStepHolders() {
return holders_;
}
ID_TYPE
......@@ -132,9 +141,6 @@ class Operations : public std::enable_shared_from_this<Operations> {
virtual std::string
ToString() const;
Status
RollBack();
virtual Status
OnSnapshotStale();
virtual Status
......@@ -158,12 +164,13 @@ class Operations : public std::enable_shared_from_this<Operations> {
Status
CheckPrevSnapshot() const;
Status
ApplyRollBack(Store&);
void
RollBack();
OperationContext context_;
ScopedSnapshotT prev_ss_;
StepsT steps_;
StepsHolderT holders_;
size_t last_pos_;
std::vector<ID_TYPE> ids_;
bool done_ = false;
Status status_;
......@@ -179,7 +186,10 @@ Operations::AddStep(const StepT& step, bool activate) {
auto s = std::make_shared<StepT>(step);
if (activate)
s->Activate();
steps_.push_back(s);
last_pos_ = Index<typename StepT::SetT, StepsHolderT>::value;
auto& holder = std::get<Index<typename StepT::SetT, StepsHolderT>::value>(holders_);
holder.insert(s);
}
template <typename StepT>
......@@ -189,7 +199,9 @@ Operations::AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate
if (activate)
s->Activate();
s->SetLsn(lsn);
steps_.push_back(s);
last_pos_ = Index<typename StepT::SetT, StepsHolderT>::value;
auto& holder = std::get<Index<typename StepT::SetT, StepsHolderT>::value>(holders_);
holder.insert(s);
}
template <typename ResourceT>
......
......@@ -15,6 +15,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <thread>
#include <utility>
......@@ -354,6 +355,7 @@ class Collection : public BaseResource,
public:
using Ptr = std::shared_ptr<Collection>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Collection>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Collection";
......@@ -380,6 +382,7 @@ class CollectionCommit : public BaseResource,
static constexpr const char* Name = "CollectionCommit";
using Ptr = std::shared_ptr<CollectionCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<CollectionCommit>>;
using VecT = std::vector<Ptr>;
CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, SIZE_TYPE row_cnt = 0,
......@@ -402,6 +405,7 @@ class Partition : public BaseResource,
public:
using Ptr = std::shared_ptr<Partition>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Partition>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Partition";
......@@ -426,6 +430,7 @@ class PartitionCommit : public BaseResource,
public:
using Ptr = std::shared_ptr<PartitionCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<PartitionCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "PartitionCommit";
......@@ -454,6 +459,7 @@ class Segment : public BaseResource,
public:
using Ptr = std::shared_ptr<Segment>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Segment>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Segment";
......@@ -483,6 +489,7 @@ class SegmentCommit : public BaseResource,
public:
using Ptr = std::shared_ptr<SegmentCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SegmentCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentCommit";
......@@ -514,6 +521,7 @@ class SegmentFile : public BaseResource,
public:
using Ptr = std::shared_ptr<SegmentFile>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SegmentFile>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentFile";
......@@ -538,6 +546,7 @@ class SchemaCommit : public BaseResource,
public:
using Ptr = std::shared_ptr<SchemaCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SchemaCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SchemaCommit";
......@@ -564,6 +573,7 @@ class Field : public BaseResource,
public:
using Ptr = std::shared_ptr<Field>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Field>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
......@@ -587,6 +597,7 @@ class FieldCommit : public BaseResource,
public:
using Ptr = std::shared_ptr<FieldCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldCommit";
......@@ -614,6 +625,7 @@ class FieldElement : public BaseResource,
public:
using Ptr = std::shared_ptr<FieldElement>;
using MapT = std::map<ID_TYPE, Ptr>;
using SetT = std::set<Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldElement>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
......
......@@ -24,6 +24,7 @@
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <shared_mutex>
#include <sstream>
#include <string>
......@@ -53,52 +54,35 @@ class Store {
return store;
}
template <typename... ResourceT>
bool
DoCommit(ResourceT&&... resources) {
auto t = std::make_tuple(std::forward<ResourceT>(resources)...);
auto& t_size = std::tuple_size<decltype(t)>::value;
if (t_size == 0) {
return false;
}
StartTransaction();
std::apply([this](auto&&... resource) { ((std::cout << CommitResource(resource) << "\n"), ...); }, t);
FinishTransaction();
return true;
}
template <typename OpT>
Status
DoCommitOperation(OpT& op) {
for (auto& step_v : op.GetSteps()) {
auto id = ProcessOperationStep(step_v);
op.SetStepResult(id);
}
ApplyOperation(OpT& op) {
std::apply(
[&](auto&... steps_set) {
std::size_t n{0};
((ApplyOpStep(op, n++, steps_set)), ...);
},
op.GetStepHolders());
return Status::OK();
}
template <typename OpT>
template <typename T, typename OpT>
void
Apply(OpT& op) {
op.ApplyToStore(*this);
}
void
StartTransaction() {
ApplyOpStep(OpT& op, size_t pos, std::set<std::shared_ptr<T>>& steps_set) {
typename T::Ptr ret;
for (auto& res : steps_set) {
CreateResource<T>(T(*res), ret);
res->SetID(ret->GetID());
}
if (ret && (pos == op.GetPos())) {
op.SetStepResult(ret->GetID());
}
}
template <typename OpT>
void
FinishTransaction() {
}
template <typename ResourceT>
bool
CommitResource(ResourceT&& resource) {
std::cout << "Commit " << resource.Name << " " << resource.GetID() << std::endl;
auto res = CreateResource<typename std::remove_reference<ResourceT>::type>(std::move(resource));
if (!res)
return false;
return true;
Apply(OpT& op) {
op.ApplyToStore(*this);
}
template <typename ResourceT>
......@@ -280,110 +264,7 @@ class Store {
}
private:
ID_TYPE
ProcessOperationStep(const std::any& step_v) {
if (const auto it = any_flush_vistors_.find(std::type_index(step_v.type())); it != any_flush_vistors_.cend()) {
return it->second(step_v);
} else {
std::cerr << "Unregisted step type " << std::quoted(step_v.type().name());
return 0;
}
}
template <class T, class F>
inline std::pair<const std::type_index, std::function<ID_TYPE(std::any const&)>>
to_any_visitor(F const& f) {
return {std::type_index(typeid(T)), [g = f](std::any const& a) -> ID_TYPE {
if constexpr (std::is_void_v<T>)
return g();
else
return g(std::any_cast<T const&>(a));
}};
}
template <class T, class F>
inline void
register_any_visitor(F const& f) {
/* std::cout << "Register visitor for type " << std::quoted(typeid(T).name()) << '\n'; */
any_flush_vistors_.insert(to_any_visitor<T>(f));
}
Store() {
register_any_visitor<Collection::Ptr>([this](auto c) {
CollectionPtr n;
CreateResource<Collection>(Collection(*c), n);
return n->GetID();
});
register_any_visitor<CollectionCommit::Ptr>([this](auto c) {
using T = CollectionCommit;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<SchemaCommit::Ptr>([this](auto c) {
using T = SchemaCommit;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<FieldCommit::Ptr>([this](auto c) {
using T = FieldCommit;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<Field::Ptr>([this](auto c) {
using T = Field;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<FieldElement::Ptr>([this](auto c) {
using T = FieldElement;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<PartitionCommit::Ptr>([this](auto c) {
using T = PartitionCommit;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<Partition::Ptr>([this](auto c) {
using T = Partition;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<Segment::Ptr>([this](auto c) {
using T = Segment;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<SegmentCommit::Ptr>([this](auto c) {
using T = SegmentCommit;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
register_any_visitor<SegmentFile::Ptr>([this](auto c) {
using T = SegmentFile;
using PtrT = typename T::Ptr;
PtrT n;
CreateResource<T>(T(*c), n);
return n->GetID();
});
}
void
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册