From 170209453f736d97df478a561ada00a6923aa760 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 24 Jun 2020 09:46:43 +0800 Subject: [PATCH] snapshot gc (#2662) * add Event.h Signed-off-by: yudong.cai * add ResourceHelper and update event handler Signed-off-by: yudong.cai * update ResourceHelper Signed-off-by: yudong.cai * add CollectionId for segment Signed-off-by: yudong.cai * add CollectionId for SegmentFile Signed-off-by: yudong.cai * update ResourceHelper Signed-off-by: yudong.cai * re-order Resources Signed-off-by: yudong.cai * update ResourceHelper Signed-off-by: yudong.cai * fix clang-format Signed-off-by: yudong.cai * retry ci Signed-off-by: yudong.cai --- CHANGELOG.md | 2 - core/src/db/snapshot/BaseResource.h | 4 +- core/src/db/snapshot/Context.h | 1 + core/src/db/snapshot/Event.h | 76 ++++++++++++++ core/src/db/snapshot/EventExecutor.h | 39 ++----- core/src/db/snapshot/ResourceHelper.h | 74 +++++++++++++ core/src/db/snapshot/ResourceOperations.cpp | 6 +- core/src/db/snapshot/Resources.cpp | 109 ++++++++++---------- core/src/db/snapshot/Resources.h | 19 ++-- core/src/db/snapshot/Store.h | 5 +- 10 files changed, 233 insertions(+), 102 deletions(-) create mode 100644 core/src/db/snapshot/Event.h create mode 100644 core/src/db/snapshot/ResourceHelper.h diff --git a/CHANGELOG.md b/CHANGELOG.md index c2556b9dd..939c33304 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,8 +53,6 @@ Please mark all changes in change log and use the issue from GitHub - \#2516 Improve unit test coverage - \#2548 Upgrade mishards for milvus v0.10.0 -## Task - # Milvus 0.9.1 (2020-05-29) ## Bug diff --git a/core/src/db/snapshot/BaseResource.h b/core/src/db/snapshot/BaseResource.h index 2eb4ffe6e..8931714e7 100644 --- a/core/src/db/snapshot/BaseResource.h +++ b/core/src/db/snapshot/BaseResource.h @@ -20,12 +20,10 @@ namespace milvus::engine::snapshot { class BaseResource : public ReferenceProxy { public: - [[nodiscard]] virtual std::string + virtual std::string ToString() const { return std::string(); } - - ~BaseResource() override = default; }; using BaseResourcePtr = std::shared_ptr; diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index ab4f7db9a..26428971f 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -37,6 +37,7 @@ struct SegmentFileContext { std::string field_element_name; ID_TYPE segment_id; ID_TYPE partition_id; + ID_TYPE collection_id; }; struct LoadOperationContext { diff --git a/core/src/db/snapshot/Event.h b/core/src/db/snapshot/Event.h new file mode 100644 index 000000000..73fb92242 --- /dev/null +++ b/core/src/db/snapshot/Event.h @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "db/snapshot/Operations.h" +#include "db/snapshot/ResourceHelper.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { +namespace snapshot { + +class Event { + public: + virtual Status + Process() = 0; +}; + +template +class ResourceGCEvent : public Event { + public: + using Ptr = std::shared_ptr; + + explicit ResourceGCEvent(class ResourceT::Ptr res) : res_(res) { + } + + ~ResourceGCEvent() = default; + + Status + Process() override { + /* mark resource as 'deleted' in meta */ + auto sd_op = std::make_shared>(res_->GetID()); + STATUS_CHECK(sd_op->Push()); + + /* TODO: physically clean resource */ + std::vector res_file_list; + STATUS_CHECK(GetResFiles(res_file_list, res_)); + for (auto& res_file : res_file_list) { + if (!boost::filesystem::exists(res_file)) { + continue; + } + if (boost::filesystem::is_directory(res_file)) { + boost::filesystem::remove_all(res_file); + } else { + boost::filesystem::remove(res_file); + } + } + + /* remove resource from meta */ + auto hd_op = std::make_shared>(res_->GetID()); + STATUS_CHECK(hd_op->Push()); + + return Status::OK(); + } + + private: + class ResourceT::Ptr res_; +}; + +} // namespace snapshot +} // namespace engine +} // namespace milvus diff --git a/core/src/db/snapshot/EventExecutor.h b/core/src/db/snapshot/EventExecutor.h index 5af7a008d..cb93973fb 100644 --- a/core/src/db/snapshot/EventExecutor.h +++ b/core/src/db/snapshot/EventExecutor.h @@ -13,35 +13,15 @@ #include #include -#include #include -#include "ResourceTypes.h" + +#include "db/snapshot/Event.h" #include "utils/BlockingQueue.h" namespace milvus { namespace engine { namespace snapshot { -enum class EventType { - EVENT_INVALID = 0, - EVENT_GC = 1, -}; - -struct EventContext { - ID_TYPE id; - std::string res_type; -}; - -struct Event { - EventType type; - EventContext context; - - std::string - ToString() { - return context.res_type + "_" + std::to_string(context.id); - } -}; - using EventPtr = std::shared_ptr; using ThreadPtr = std::shared_ptr; using EventQueue = BlockingQueue; @@ -62,7 +42,7 @@ class EventExecutor { } Status - Submit(EventPtr& evt) { + Submit(const EventPtr& evt) { if (evt == nullptr) { return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Resource"); } @@ -95,21 +75,16 @@ class EventExecutor { if (evt == nullptr) { break; } - std::cout << std::this_thread::get_id() << " Dequeue Event " << evt->ToString() << std::endl; - switch (evt->type) { - case EventType::EVENT_GC: - break; - default: - break; - } + std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl; + evt->Process(); } } void - Enqueue(EventPtr evt) { + Enqueue(const EventPtr& evt) { queue_.Put(evt); if (evt != nullptr) { - std::cout << std::this_thread::get_id() << " Enqueue Event " << evt->ToString() << std::endl; + std::cout << std::this_thread::get_id() << " Enqueue Event " << std::endl; } } diff --git a/core/src/db/snapshot/ResourceHelper.h b/core/src/db/snapshot/ResourceHelper.h new file mode 100644 index 000000000..995d48a22 --- /dev/null +++ b/core/src/db/snapshot/ResourceHelper.h @@ -0,0 +1,74 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include + +#include "db/snapshot/Resources.h" +#include "utils/Status.h" + +namespace milvus::engine::snapshot { + +template +inline Status +GetResFiles(std::vector& file_list, typename ResourceT::Ptr& res_ptr) { + return Status::OK(); +} + +template <> +inline Status +GetResFiles(std::vector& file_list, Collection::Ptr& res_ptr) { + std::stringstream ss; + ss << res_ptr->GetID(); + + file_list.push_back(ss.str()); + return Status::OK(); +} + +template <> +inline Status +GetResFiles(std::vector& file_list, Partition::Ptr& res_ptr) { + std::stringstream ss; + ss << res_ptr->GetCollectionId() << "/"; + ss << res_ptr->GetID(); + + file_list.push_back(ss.str()); + return Status::OK(); +} + +template <> +inline Status +GetResFiles(std::vector& file_list, Segment::Ptr& res_ptr) { + std::stringstream ss; + ss << res_ptr->GetCollectionId() << "/"; + ss << res_ptr->GetPartitionId() << "/"; + ss << res_ptr->GetID(); + + file_list.push_back(ss.str()); + return Status::OK(); +} + +template <> +inline Status +GetResFiles(std::vector& file_list, SegmentFile::Ptr& res_ptr) { + std::stringstream ss; + ss << res_ptr->GetCollectionId() << "/"; + ss << res_ptr->GetPartitionId() << "/"; + ss << res_ptr->GetSegmentId() << "/"; + ss << res_ptr->GetID(); + + file_list.push_back(ss.str()); + return Status::OK(); +} + +} // namespace milvus::engine::snapshot diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index 1363ada3a..3fa41d729 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -151,7 +151,8 @@ SegmentOperation::DoExecute(Store& store) { return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID()); - resource_ = std::make_shared(context_.prev_partition->GetID(), prev_num + 1); + resource_ = std::make_shared(context_.prev_partition->GetCollectionId(), context_.prev_partition->GetID(), + prev_num + 1); AddStep(*resource_, false); return Status::OK(); } @@ -196,7 +197,8 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS Status SegmentFileOperation::DoExecute(Store& store) { auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name); - resource_ = std::make_shared(context_.partition_id, context_.segment_id, field_element_id); + resource_ = std::make_shared(context_.collection_id, context_.partition_id, context_.segment_id, + field_element_id); AddStep(*resource_, false); return Status::OK(); } diff --git a/core/src/db/snapshot/Resources.cpp b/core/src/db/snapshot/Resources.cpp index fb47c2b48..7475dbc30 100644 --- a/core/src/db/snapshot/Resources.cpp +++ b/core/src/db/snapshot/Resources.cpp @@ -26,53 +26,6 @@ Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State UpdatedOnField(updated_on) { } -SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status, - TS_TYPE created_on, TS_TYPE updated_on) - : CollectionIdField(collection_id), - MappingsField(mappings), - IdField(id), - LsnField(lsn), - StatusField(status), - CreatedOnField(created_on), - UpdatedOnField(updated_on) { -} - -FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, - State status, TS_TYPE created_on, TS_TYPE updated_on) - : CollectionIdField(collection_id), - FieldIdField(field_id), - MappingsField(mappings), - IdField(id), - LsnField(lsn), - StatusField(status), - CreatedOnField(created_on), - UpdatedOnField(updated_on) { -} - -Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, - TS_TYPE updated_on) - : NameField(name), - NumField(num), - IdField(id), - LsnField(lsn), - StatusField(status), - CreatedOnField(created_on), - UpdatedOnField(updated_on) { -} - -FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, - ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) - : CollectionIdField(collection_id), - FieldIdField(field_id), - NameField(name), - FtypeField(ftype), - IdField(id), - LsnField(lsn), - StatusField(status), - CreatedOnField(created_on), - UpdatedOnField(updated_on) { -} - CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) : CollectionIdField(collection_id), @@ -121,9 +74,10 @@ PartitionCommit::ToString() const { return ss.str(); } -Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, - TS_TYPE updated_on) - : PartitionIdField(partition_id), +Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, + TS_TYPE created_on, TS_TYPE updated_on) + : CollectionIdField(collection_id), + PartitionIdField(partition_id), NumField(num), IdField(id), LsnField(lsn), @@ -138,6 +92,7 @@ Segment::ToString() const { ss << "Segment [" << this << "]: "; ss << "id=" << GetID() << ", "; ss << "partition_id=" << GetPartitionId() << ", "; + ss << "collection_id=" << GetCollectionId() << ", "; ss << "num=" << (NUM_TYPE)GetNum() << ", "; ss << "status=" << GetStatus() << ", "; return ss.str(); @@ -167,9 +122,10 @@ SegmentCommit::ToString() const { return ss.str(); } -SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id, LSN_TYPE lsn, - State status, TS_TYPE created_on, TS_TYPE updated_on) - : PartitionIdField(partition_id), +SegmentFile::SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, + ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + : CollectionIdField(collection_id), + PartitionIdField(partition_id), SegmentIdField(segment_id), FieldElementIdField(field_element_id), IdField(id), @@ -179,4 +135,51 @@ SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field UpdatedOnField(updated_on) { } +SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status, + TS_TYPE created_on, TS_TYPE updated_on) + : CollectionIdField(collection_id), + MappingsField(mappings), + IdField(id), + LsnField(lsn), + StatusField(status), + CreatedOnField(created_on), + UpdatedOnField(updated_on) { +} + +Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, + TS_TYPE updated_on) + : NameField(name), + NumField(num), + IdField(id), + LsnField(lsn), + StatusField(status), + CreatedOnField(created_on), + UpdatedOnField(updated_on) { +} + +FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, + State status, TS_TYPE created_on, TS_TYPE updated_on) + : CollectionIdField(collection_id), + FieldIdField(field_id), + MappingsField(mappings), + IdField(id), + LsnField(lsn), + StatusField(status), + CreatedOnField(created_on), + UpdatedOnField(updated_on) { +} + +FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, + ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on) + : CollectionIdField(collection_id), + FieldIdField(field_id), + NameField(name), + FtypeField(ftype), + IdField(id), + LsnField(lsn), + StatusField(status), + CreatedOnField(created_on), + UpdatedOnField(updated_on) { +} + } // namespace milvus::engine::snapshot diff --git a/core/src/db/snapshot/Resources.h b/core/src/db/snapshot/Resources.h index 1f3515023..2bf348f74 100644 --- a/core/src/db/snapshot/Resources.h +++ b/core/src/db/snapshot/Resources.h @@ -56,7 +56,7 @@ class StatusField { return status_; } - [[nodiscard]] bool + bool IsActive() const { return status_ == ACTIVE; } @@ -378,7 +378,7 @@ class PartitionCommit : public BaseResource, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); - [[nodiscard]] std::string + std::string ToString() const override; }; @@ -387,6 +387,7 @@ using PartitionCommitPtr = PartitionCommit::Ptr; /////////////////////////////////////////////////////////////////////////////// class Segment : public BaseResource, + public CollectionIdField, public PartitionIdField, public NumField, public IdField, @@ -401,10 +402,11 @@ class Segment : public BaseResource, using VecT = std::vector; static constexpr const char* Name = "Segment"; - explicit Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, - TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); + explicit Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, + State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), + TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); - [[nodiscard]] std::string + std::string ToString() const override; }; @@ -431,7 +433,7 @@ class SegmentCommit : public BaseResource, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); - [[nodiscard]] std::string + std::string ToString() const override; }; @@ -440,6 +442,7 @@ using SegmentCommitPtr = SegmentCommit::Ptr; /////////////////////////////////////////////////////////////////////////////// class SegmentFile : public BaseResource, + public CollectionIdField, public PartitionIdField, public SegmentIdField, public FieldElementIdField, @@ -455,8 +458,8 @@ class SegmentFile : public BaseResource, using VecT = std::vector; static constexpr const char* Name = "SegmentFile"; - SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id = 0, LSN_TYPE lsn = 0, - State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), + SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, + ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp()); }; diff --git a/core/src/db/snapshot/Store.h b/core/src/db/snapshot/Store.h index f66dc397f..b6889da73 100644 --- a/core/src/db/snapshot/Store.h +++ b/core/src/db/snapshot/Store.h @@ -435,7 +435,7 @@ class Store { MappingT p_c_m; for (auto si = 1; si <= random_segments; ++si) { SegmentPtr s; - CreateResource(Segment(p->GetID(), si), s); + CreateResource(Segment(c->GetID(), p->GetID(), si), s); all_records.push_back(s); auto& schema_m = schema->GetMappings(); MappingT s_c_m; @@ -444,7 +444,8 @@ class Store { auto& f_c_m = field_commit->GetMappings(); for (auto field_element_id : f_c_m) { SegmentFilePtr sf; - CreateResource(SegmentFile(p->GetID(), s->GetID(), field_commit_id), sf); + CreateResource( + SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_commit_id), sf); all_records.push_back(sf); s_c_m.insert(sf->GetID()); -- GitLab