未验证 提交 17020945 编写于 作者: C Cai Yudong 提交者: GitHub

snapshot gc (#2662)

* add Event.h
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add ResourceHelper and update event handler
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update ResourceHelper
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add CollectionId for segment
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add CollectionId for SegmentFile
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update ResourceHelper
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* re-order Resources
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update ResourceHelper
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry ci
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 887b7f7f
......@@ -53,8 +53,6 @@ Please mark all changes in change log and use the issue from GitHub
- \#2516 Improve unit test coverage
- \#2548 Upgrade mishards for milvus v0.10.0
## Task
# Milvus 0.9.1 (2020-05-29)
## Bug
......
......@@ -20,12 +20,10 @@ namespace milvus::engine::snapshot {
class BaseResource : public ReferenceProxy {
public:
[[nodiscard]] virtual std::string
virtual std::string
ToString() const {
return std::string();
}
~BaseResource() override = default;
};
using BaseResourcePtr = std::shared_ptr<BaseResource>;
......
......@@ -37,6 +37,7 @@ struct SegmentFileContext {
std::string field_element_name;
ID_TYPE segment_id;
ID_TYPE partition_id;
ID_TYPE collection_id;
};
struct LoadOperationContext {
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <boost/filesystem.hpp>
#include <memory>
#include <string>
#include <vector>
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceHelper.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
namespace snapshot {
class Event {
public:
virtual Status
Process() = 0;
};
template <class ResourceT>
class ResourceGCEvent : public Event {
public:
using Ptr = std::shared_ptr<ResourceGCEvent>;
explicit ResourceGCEvent(class ResourceT::Ptr res) : res_(res) {
}
~ResourceGCEvent() = default;
Status
Process() override {
/* mark resource as 'deleted' in meta */
auto sd_op = std::make_shared<SoftDeleteOperation<ResourceT>>(res_->GetID());
STATUS_CHECK(sd_op->Push());
/* TODO: physically clean resource */
std::vector<std::string> res_file_list;
STATUS_CHECK(GetResFiles<ResourceT>(res_file_list, res_));
for (auto& res_file : res_file_list) {
if (!boost::filesystem::exists(res_file)) {
continue;
}
if (boost::filesystem::is_directory(res_file)) {
boost::filesystem::remove_all(res_file);
} else {
boost::filesystem::remove(res_file);
}
}
/* remove resource from meta */
auto hd_op = std::make_shared<HardDeleteOperation<ResourceT>>(res_->GetID());
STATUS_CHECK(hd_op->Push());
return Status::OK();
}
private:
class ResourceT::Ptr res_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -13,35 +13,15 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "ResourceTypes.h"
#include "db/snapshot/Event.h"
#include "utils/BlockingQueue.h"
namespace milvus {
namespace engine {
namespace snapshot {
enum class EventType {
EVENT_INVALID = 0,
EVENT_GC = 1,
};
struct EventContext {
ID_TYPE id;
std::string res_type;
};
struct Event {
EventType type;
EventContext context;
std::string
ToString() {
return context.res_type + "_" + std::to_string(context.id);
}
};
using EventPtr = std::shared_ptr<Event>;
using ThreadPtr = std::shared_ptr<std::thread>;
using EventQueue = BlockingQueue<EventPtr>;
......@@ -62,7 +42,7 @@ class EventExecutor {
}
Status
Submit(EventPtr& evt) {
Submit(const EventPtr& evt) {
if (evt == nullptr) {
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Resource");
}
......@@ -95,21 +75,16 @@ class EventExecutor {
if (evt == nullptr) {
break;
}
std::cout << std::this_thread::get_id() << " Dequeue Event " << evt->ToString() << std::endl;
switch (evt->type) {
case EventType::EVENT_GC:
break;
default:
break;
}
std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl;
evt->Process();
}
}
void
Enqueue(EventPtr evt) {
Enqueue(const EventPtr& evt) {
queue_.Put(evt);
if (evt != nullptr) {
std::cout << std::this_thread::get_id() << " Enqueue Event " << evt->ToString() << std::endl;
std::cout << std::this_thread::get_id() << " Enqueue Event " << std::endl;
}
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "db/snapshot/Resources.h"
#include "utils/Status.h"
namespace milvus::engine::snapshot {
template <class ResourceT>
inline Status
GetResFiles(std::vector<std::string>& file_list, typename ResourceT::Ptr& res_ptr) {
return Status::OK();
}
template <>
inline Status
GetResFiles<Collection>(std::vector<std::string>& file_list, Collection::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetID();
file_list.push_back(ss.str());
return Status::OK();
}
template <>
inline Status
GetResFiles<Partition>(std::vector<std::string>& file_list, Partition::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetID();
file_list.push_back(ss.str());
return Status::OK();
}
template <>
inline Status
GetResFiles<Segment>(std::vector<std::string>& file_list, Segment::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetID();
file_list.push_back(ss.str());
return Status::OK();
}
template <>
inline Status
GetResFiles<SegmentFile>(std::vector<std::string>& file_list, SegmentFile::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetSegmentId() << "/";
ss << res_ptr->GetID();
file_list.push_back(ss.str());
return Status::OK();
}
} // namespace milvus::engine::snapshot
......@@ -151,7 +151,8 @@ SegmentOperation::DoExecute(Store& store) {
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
auto prev_num = GetStartedSS()->GetMaxSegmentNumByPartition(context_.prev_partition->GetID());
resource_ = std::make_shared<Segment>(context_.prev_partition->GetID(), prev_num + 1);
resource_ = std::make_shared<Segment>(context_.prev_partition->GetCollectionId(), context_.prev_partition->GetID(),
prev_num + 1);
AddStep(*resource_, false);
return Status::OK();
}
......@@ -196,7 +197,8 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS
Status
SegmentFileOperation::DoExecute(Store& store) {
auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name);
resource_ = std::make_shared<SegmentFile>(context_.partition_id, context_.segment_id, field_element_id);
resource_ = std::make_shared<SegmentFile>(context_.collection_id, context_.partition_id, context_.segment_id,
field_element_id);
AddStep(*resource_, false);
return Status::OK();
}
......
......@@ -26,53 +26,6 @@ Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State
UpdatedOnField(updated_on) {
}
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id,
LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
......@@ -121,9 +74,10 @@ PartitionCommit::ToString() const {
return ss.str();
}
Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: PartitionIdField(partition_id),
Segment::Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
NumField(num),
IdField(id),
LsnField(lsn),
......@@ -138,6 +92,7 @@ Segment::ToString() const {
ss << "Segment [" << this << "]: ";
ss << "id=" << GetID() << ", ";
ss << "partition_id=" << GetPartitionId() << ", ";
ss << "collection_id=" << GetCollectionId() << ", ";
ss << "num=" << (NUM_TYPE)GetNum() << ", ";
ss << "status=" << GetStatus() << ", ";
return ss.str();
......@@ -167,9 +122,10 @@ SegmentCommit::ToString() const {
return ss.str();
}
SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
: PartitionIdField(partition_id),
SegmentFile::SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
FieldElementIdField(field_element_id),
IdField(id),
......@@ -179,4 +135,51 @@ SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field
UpdatedOnField(updated_on) {
}
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn, State status,
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
NumField(num),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, LSN_TYPE lsn,
State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
MappingsField(mappings),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
IdField(id),
LsnField(lsn),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
} // namespace milvus::engine::snapshot
......@@ -56,7 +56,7 @@ class StatusField {
return status_;
}
[[nodiscard]] bool
bool
IsActive() const {
return status_ == ACTIVE;
}
......@@ -378,7 +378,7 @@ class PartitionCommit : public BaseResource,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
[[nodiscard]] std::string
std::string
ToString() const override;
};
......@@ -387,6 +387,7 @@ using PartitionCommitPtr = PartitionCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class Segment : public BaseResource,
public CollectionIdField,
public PartitionIdField,
public NumField,
public IdField,
......@@ -401,10 +402,11 @@ class Segment : public BaseResource,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Segment";
explicit Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
explicit Segment(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
[[nodiscard]] std::string
std::string
ToString() const override;
};
......@@ -431,7 +433,7 @@ class SegmentCommit : public BaseResource,
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
[[nodiscard]] std::string
std::string
ToString() const override;
};
......@@ -440,6 +442,7 @@ using SegmentCommitPtr = SegmentCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class SegmentFile : public BaseResource,
public CollectionIdField,
public PartitionIdField,
public SegmentIdField,
public FieldElementIdField,
......@@ -455,8 +458,8 @@ class SegmentFile : public BaseResource,
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentFile";
SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
SegmentFile(ID_TYPE collection_id, ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id,
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
......
......@@ -435,7 +435,7 @@ class Store {
MappingT p_c_m;
for (auto si = 1; si <= random_segments; ++si) {
SegmentPtr s;
CreateResource<Segment>(Segment(p->GetID(), si), s);
CreateResource<Segment>(Segment(c->GetID(), p->GetID(), si), s);
all_records.push_back(s);
auto& schema_m = schema->GetMappings();
MappingT s_c_m;
......@@ -444,7 +444,8 @@ class Store {
auto& f_c_m = field_commit->GetMappings();
for (auto field_element_id : f_c_m) {
SegmentFilePtr sf;
CreateResource<SegmentFile>(SegmentFile(p->GetID(), s->GetID(), field_commit_id), sf);
CreateResource<SegmentFile>(
SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_commit_id), sf);
all_records.push_back(sf);
s_c_m.insert(sf->GetID());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册