From 5964adef4936dcd9849a64126e8e100707ff4c5c Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 7 Jul 2020 19:38:30 +0800 Subject: [PATCH] (db/snapshot): Implement operation rollback (#2760) * (db/snapshot): refactor Operations Signed-off-by: peng.xu * (db/snapshot): refactor Operations Signed-off-by: peng.xu * (db/snapshot): Add rollback for operations Signed-off-by: peng.xu --- core/src/db/snapshot/EventExecutor.h | 6 +- core/src/db/snapshot/Operations.cpp | 35 ++++-- core/src/db/snapshot/Operations.h | 36 ++++-- core/src/db/snapshot/Resources.h | 12 ++ core/src/db/snapshot/Store.h | 161 ++++----------------------- 5 files changed, 86 insertions(+), 164 deletions(-) diff --git a/core/src/db/snapshot/EventExecutor.h b/core/src/db/snapshot/EventExecutor.h index f831c68a..f0acfbc6 100644 --- a/core/src/db/snapshot/EventExecutor.h +++ b/core/src/db/snapshot/EventExecutor.h @@ -70,13 +70,17 @@ class EventExecutor { private: void ThreadMain() { + Status status; while (true) { EventPtr evt = queue_.Take(); if (evt == nullptr) { break; } /* std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl; */ - evt->Process(); + status = evt->Process(); + if (!status.ok()) { + std::cout << "EventExecutor Handle Event Error: " << status.ToString() << std::endl; + } } } diff --git a/core/src/db/snapshot/Operations.cpp b/core/src/db/snapshot/Operations.cpp index e47b3c07..27f104cf 100644 --- a/core/src/db/snapshot/Operations.cpp +++ b/core/src/db/snapshot/Operations.cpp @@ -12,6 +12,8 @@ #include "db/snapshot/Operations.h" #include #include +#include "db/snapshot/Event.h" +#include "db/snapshot/EventExecutor.h" #include "db/snapshot/OperationExecutor.h" #include "db/snapshot/Snapshots.h" @@ -100,6 +102,13 @@ Operations::Done(Store& store) { Snapshots::GetInstance().LoadSnapshot(store, context_.latest_ss, context_.new_collection_commit->GetCollectionId(), ids_.back()); } + /* if (!context_.latest_ss && context_.new_collection_commit) { */ + /* auto& holder = std::get(holders_); */ + /* if (holder.size() > 0) */ + /* Snapshots::GetInstance().LoadSnapshot(store, context_.latest_ss, */ + /* context_.new_collection_commit->GetCollectionId(), holder.rbegin()->GetID()); */ + /* } */ + /* } */ std::cout << ToString() << std::endl; } finish_cond_.notify_all(); @@ -230,24 +239,28 @@ Operations::DoExecute(Store& store) { Status Operations::PostExecute(Store& store) { - return store.DoCommitOperation(*this); + return store.ApplyOperation(*this); } -Status -Operations::RollBack() { - // TODO: Implement here - // Spwarn a rollback operation or re-use this operation - return Status::OK(); +template +void +ApplyRollBack(std::set>& steps_set) { + for (auto& res : steps_set) { + auto evt_ptr = std::make_shared>(res); + EventExecutor::GetInstance().Submit(evt_ptr); + std::cout << "Rollback " << typeid(ResourceT).name() << ": " << res->GetID() << std::endl; + } } -Status -Operations::ApplyRollBack(Store& store) { - // TODO: Implement rollback to remove all resources in steps_ - return Status::OK(); +void +Operations::RollBack() { + std::apply([&](auto&... steps_set) { ((ApplyRollBack(steps_set)), ...); }, GetStepHolders()); } Operations::~Operations() { - // TODO: Prefer to submit a rollback operation if status is not ok + if (!status_.ok()) { + RollBack(); + } } } // namespace snapshot diff --git a/core/src/db/snapshot/Operations.h b/core/src/db/snapshot/Operations.h index 18f65a1f..7ec39cf2 100644 --- a/core/src/db/snapshot/Operations.h +++ b/core/src/db/snapshot/Operations.h @@ -17,8 +17,10 @@ #include #include #include +#include #include #include +#include #include #include "Context.h" #include "db/snapshot/Snapshot.h" @@ -30,8 +32,10 @@ namespace milvus { namespace engine { namespace snapshot { -using StepsT = std::vector; using CheckStaleFunc = std::function; +using StepsHolderT = std::tuple; enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound }; @@ -71,9 +75,14 @@ class Operations : public std::enable_shared_from_this { ids_.push_back(id); } - StepsT& - GetSteps() { - return steps_; + const size_t + GetPos() const { + return last_pos_; + } + + StepsHolderT& + GetStepHolders() { + return holders_; } ID_TYPE @@ -132,9 +141,6 @@ class Operations : public std::enable_shared_from_this { virtual std::string ToString() const; - Status - RollBack(); - virtual Status OnSnapshotStale(); virtual Status @@ -158,12 +164,13 @@ class Operations : public std::enable_shared_from_this { Status CheckPrevSnapshot() const; - Status - ApplyRollBack(Store&); + void + RollBack(); OperationContext context_; ScopedSnapshotT prev_ss_; - StepsT steps_; + StepsHolderT holders_; + size_t last_pos_; std::vector ids_; bool done_ = false; Status status_; @@ -179,7 +186,10 @@ Operations::AddStep(const StepT& step, bool activate) { auto s = std::make_shared(step); if (activate) s->Activate(); - steps_.push_back(s); + + last_pos_ = Index::value; + auto& holder = std::get::value>(holders_); + holder.insert(s); } template @@ -189,7 +199,9 @@ Operations::AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate if (activate) s->Activate(); s->SetLsn(lsn); - steps_.push_back(s); + last_pos_ = Index::value; + auto& holder = std::get::value>(holders_); + holder.insert(s); } template diff --git a/core/src/db/snapshot/Resources.h b/core/src/db/snapshot/Resources.h index 9ef67a42..7a97b867 100644 --- a/core/src/db/snapshot/Resources.h +++ b/core/src/db/snapshot/Resources.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -354,6 +355,7 @@ class Collection : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "Collection"; @@ -380,6 +382,7 @@ class CollectionCommit : public BaseResource, static constexpr const char* Name = "CollectionCommit"; using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, SIZE_TYPE row_cnt = 0, @@ -402,6 +405,7 @@ class Partition : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "Partition"; @@ -426,6 +430,7 @@ class PartitionCommit : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "PartitionCommit"; @@ -454,6 +459,7 @@ class Segment : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "Segment"; @@ -483,6 +489,7 @@ class SegmentCommit : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "SegmentCommit"; @@ -514,6 +521,7 @@ class SegmentFile : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "SegmentFile"; @@ -538,6 +546,7 @@ class SchemaCommit : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "SchemaCommit"; @@ -564,6 +573,7 @@ class Field : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "Field"; @@ -587,6 +597,7 @@ class FieldCommit : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "FieldCommit"; @@ -614,6 +625,7 @@ class FieldElement : public BaseResource, public: using Ptr = std::shared_ptr; using MapT = std::map; + using SetT = std::set; using ScopedMapT = std::map>; using VecT = std::vector; static constexpr const char* Name = "FieldElement"; diff --git a/core/src/db/snapshot/Store.h b/core/src/db/snapshot/Store.h index bc40556f..d96915a8 100644 --- a/core/src/db/snapshot/Store.h +++ b/core/src/db/snapshot/Store.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -53,52 +54,35 @@ class Store { return store; } - template - bool - DoCommit(ResourceT&&... resources) { - auto t = std::make_tuple(std::forward(resources)...); - auto& t_size = std::tuple_size::value; - if (t_size == 0) { - return false; - } - StartTransaction(); - std::apply([this](auto&&... resource) { ((std::cout << CommitResource(resource) << "\n"), ...); }, t); - FinishTransaction(); - return true; - } - template Status - DoCommitOperation(OpT& op) { - for (auto& step_v : op.GetSteps()) { - auto id = ProcessOperationStep(step_v); - op.SetStepResult(id); - } + ApplyOperation(OpT& op) { + std::apply( + [&](auto&... steps_set) { + std::size_t n{0}; + ((ApplyOpStep(op, n++, steps_set)), ...); + }, + op.GetStepHolders()); return Status::OK(); } - template + template void - Apply(OpT& op) { - op.ApplyToStore(*this); - } - - void - StartTransaction() { + ApplyOpStep(OpT& op, size_t pos, std::set>& steps_set) { + typename T::Ptr ret; + for (auto& res : steps_set) { + CreateResource(T(*res), ret); + res->SetID(ret->GetID()); + } + if (ret && (pos == op.GetPos())) { + op.SetStepResult(ret->GetID()); + } } + template void - FinishTransaction() { - } - - template - bool - CommitResource(ResourceT&& resource) { - std::cout << "Commit " << resource.Name << " " << resource.GetID() << std::endl; - auto res = CreateResource::type>(std::move(resource)); - if (!res) - return false; - return true; + Apply(OpT& op) { + op.ApplyToStore(*this); } template @@ -280,110 +264,7 @@ class Store { } private: - ID_TYPE - ProcessOperationStep(const std::any& step_v) { - if (const auto it = any_flush_vistors_.find(std::type_index(step_v.type())); it != any_flush_vistors_.cend()) { - return it->second(step_v); - } else { - std::cerr << "Unregisted step type " << std::quoted(step_v.type().name()); - return 0; - } - } - - template - inline std::pair> - to_any_visitor(F const& f) { - return {std::type_index(typeid(T)), [g = f](std::any const& a) -> ID_TYPE { - if constexpr (std::is_void_v) - return g(); - else - return g(std::any_cast(a)); - }}; - } - - template - inline void - register_any_visitor(F const& f) { - /* std::cout << "Register visitor for type " << std::quoted(typeid(T).name()) << '\n'; */ - any_flush_vistors_.insert(to_any_visitor(f)); - } - Store() { - register_any_visitor([this](auto c) { - CollectionPtr n; - CreateResource(Collection(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = CollectionCommit; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = SchemaCommit; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = FieldCommit; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = Field; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = FieldElement; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = PartitionCommit; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = Partition; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = Segment; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = SegmentCommit; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); - register_any_visitor([this](auto c) { - using T = SegmentFile; - using PtrT = typename T::Ptr; - PtrT n; - CreateResource(T(*c), n); - return n->GetID(); - }); } void -- GitLab