未验证 提交 cf7ce289 编写于 作者: X XuPeng-SH 提交者: GitHub

fix readonly mode crash issue (#4396) (#4400)

Signed-off-by: NXuPeng-SH <xupeng3112@163.com>
上级 176ffdb0
......@@ -18,13 +18,22 @@ namespace milvus {
namespace engine {
namespace snapshot {
#define CHECK_NOT_NULL_AND_RETURN(target) \
if (not target) { \
return; \
}
void
Snapshot::RefAll() {
assert(invalid_ != true);
std::apply([this](auto&... resource) { ((DoRef(resource)), ...); }, resources_);
}
void
Snapshot::UnRefAll() {
if (invalid_) {
return;
}
std::apply([this](auto&... resource) { ((DoUnRef(resource)), ...); }, resources_);
}
......@@ -42,14 +51,17 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto& segment_files_holder = SegmentFilesHolder::GetInstance();
auto collection_commit = collection_commits_holder.GetResource(store, ss_id, false);
CHECK_NOT_NULL_AND_RETURN(collection_commit);
AddResource<CollectionCommit>(collection_commit);
max_lsn_ = collection_commit->GetLsn();
auto schema_commit = schema_commits_holder.GetResource(store, collection_commit->GetSchemaId(), false);
CHECK_NOT_NULL_AND_RETURN(schema_commit);
AddResource<SchemaCommit>(schema_commit);
current_schema_id_ = schema_commit->GetID();
auto collection = collections_holder.GetResource(store, collection_commit->GetCollectionId(), false);
CHECK_NOT_NULL_AND_RETURN(collection);
AddResource<Collection>(collection);
auto base_path = GetResPath<Collection>(store->GetRootPath(), std::make_shared<Collection>(*collection));
......@@ -60,11 +72,13 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto partition_id = partition_commit->GetPartitionId();
auto partition = partitions_holder.GetResource(store, partition_id, false);
auto partition_name = partition->GetName();
CHECK_NOT_NULL_AND_RETURN(partition_commit);
AddResource<PartitionCommit>(partition_commit);
base_path = GetResPath<Partition>(store->GetRootPath(), std::make_shared<Partition>(*partition));
partition_commit->LoadIds(base_path);
p_pc_map_[partition_id] = partition_commit->GetID();
CHECK_NOT_NULL_AND_RETURN(partition);
AddResource<Partition>(partition);
partition_names_map_[partition_name] = partition_id;
p_max_seg_num_[partition_id] = 0;
......@@ -76,10 +90,13 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto& partition_commit_mappings = partition_commit->GetMappings();
for (auto s_c_id : partition_commit_mappings) {
auto segment_commit = segment_commits_holder.GetResource(store, s_c_id, false);
CHECK_NOT_NULL_AND_RETURN(segment_commit);
auto segment_id = segment_commit->GetSegmentId();
auto segment = segments_holder.GetResource(store, segment_id, false);
CHECK_NOT_NULL_AND_RETURN(segment);
auto segment_schema_id = segment_commit->GetSchemaId();
auto segment_schema = schema_commits_holder.GetResource(store, segment_schema_id, false);
CHECK_NOT_NULL_AND_RETURN(segment_schema);
auto segment_partition_id = segment->GetPartitionId();
AddResource<SchemaCommit>(segment_schema);
AddResource<SegmentCommit>(segment_commit);
......@@ -92,9 +109,11 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto& segment_commit_mappings = segment_commit->GetMappings();
for (auto s_f_id : segment_commit_mappings) {
auto segment_file = segment_files_holder.GetResource(store, s_f_id, false);
CHECK_NOT_NULL_AND_RETURN(segment_file);
auto segment_file_id = segment_file->GetID();
auto field_element_id = segment_file->GetFieldElementId();
auto field_element = field_elements_holder.GetResource(store, field_element_id, false);
CHECK_NOT_NULL_AND_RETURN(field_element);
AddResource<FieldElement>(field_element);
AddResource<SegmentFile>(segment_file);
element_segfiles_map_[field_element_id][segment_id] = segment_file_id;
......@@ -112,10 +131,12 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto& schema_commit = kv.second;
for (auto field_commit_id : schema_commit_mappings) {
auto field_commit = field_commits_holder.GetResource(store, field_commit_id, false);
CHECK_NOT_NULL_AND_RETURN(field_commit);
AddResource<FieldCommit>(field_commit);
auto field_id = field_commit->GetFieldId();
auto field = fields_holder.GetResource(store, field_id, false);
CHECK_NOT_NULL_AND_RETURN(field);
auto field_name = field->GetName();
AddResource<Field>(field);
......@@ -123,6 +144,7 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
auto& field_commit_mappings = field_commit->GetMappings();
for (auto field_element_id : field_commit_mappings) {
auto field_element = field_elements_holder.GetResource(store, field_element_id, false);
CHECK_NOT_NULL_AND_RETURN(field_element);
AddResource<FieldElement>(field_element);
auto field_element_name = field_element->GetName();
field_element_names_map_[field_name][field_element_name] = field_element_id;
......@@ -130,6 +152,7 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
}
}
invalid_ = false;
RefAll();
}
......@@ -197,6 +220,9 @@ Snapshot::GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const {
const std::string
Snapshot::ToString() const {
if (invalid_) {
return "Invalid Snapshot";
}
auto to_matrix_string = [](const MappingT& mappings, int line_length, size_t ident = 0) -> std::string {
std::stringstream ss;
std::string l1_spaces;
......
......@@ -335,6 +335,11 @@ class Snapshot : public ReferenceProxy {
resources[resource->GetID()] = resource;
}
bool
IsValid() const {
return !invalid_;
}
const std::string
ToString() const;
......@@ -357,6 +362,7 @@ class Snapshot : public ReferenceProxy {
std::map<ID_TYPE, NUM_TYPE> p_max_seg_num_;
LSN_TYPE max_lsn_;
std::set<ID_TYPE> empty_set_;
bool invalid_ = true;
};
using GCHandler = std::function<void(Snapshot::Ptr)>;
......
......@@ -13,6 +13,8 @@
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceHolders.h"
#include <string>
namespace milvus {
namespace engine {
namespace snapshot {
......@@ -166,6 +168,10 @@ SnapshotHolder::Add(StorePtr store, ID_TYPE id) {
Snapshot::Ptr oldest_ss;
{
auto ss = std::make_shared<Snapshot>(store, id);
if (!ss->IsValid()) {
std::string emsg = "SnapshotHolder::Add: Invalid SS " + std::to_string(id);
return Status(SS_NOT_ACTIVE_ERROR, emsg);
}
std::unique_lock<std::mutex> lock(mutex_);
if (!IsActive(ss)) {
......
......@@ -21,9 +21,11 @@
#include "utils/CommonUtil.h"
#include "utils/TimerContext.h"
#include <utility>
namespace milvus::engine::snapshot {
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 500 * 1000;
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 60 * 1000;
static constexpr int DEFAULT_WRITER_TIMER_INTERVAL_US = 2000 * 1000;
Status
......@@ -185,7 +187,7 @@ Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) const {
emsg << "Snapshots::GetHolderNoLock: Specified snapshot holder for collection ";
emsg << "\"" << name << "\""
<< " not found";
LOG_ENGINE_DEBUG_ << emsg.str();
LOG_SERVER_DEBUG_ << emsg.str();
return Status(SS_NOT_FOUND_ERROR, "Collection " + name + " not found.");
}
......@@ -236,36 +238,53 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>();
auto status = (*op)(store_);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer::GetAllActiveSnapshotIDsOperation failed: " << status.message();
// TODO: Should be monitored
return;
}
auto ids = op->GetIDs();
ScopedSnapshotT ss;
std::set<ID_TYPE> alive_cids;
std::set<ID_TYPE> this_invalid_cids;
bool diff_found = false;
for (auto& [cid, ccid] : ids) {
/* std::cout << "cid: " << cid << " ccid: " << ccid << std::endl; */
auto status = LoadSnapshot(store_, ss, cid, ccid);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
status = LoadSnapshot(store_, ss, cid, ccid);
if (status.code() == SS_NOT_ACTIVE_ERROR) {
auto found_it = invalid_ssid_.find(ccid);
this_invalid_cids.insert(ccid);
if (found_it == invalid_ssid_.end()) {
LOG_SERVER_ERROR_ << status.ToString();
diff_found = true;
}
continue;
} else if (!status.ok()) {
continue;
}
if (ss && ss->GetCollection()->IsActive()) {
alive_cids.insert(cid);
}
/* std::cout << ss->ToString() << std::endl; */
}
if (diff_found) {
LOG_SERVER_ERROR_ << "Total " << this_invalid_cids.size() << " invalid SS found!";
}
if (invalid_ssid_.size() != 0 && (this_invalid_cids.size() == 0)) {
LOG_SERVER_ERROR_ << "All invalid SS Cleared!";
// TODO: Should be monitored
}
invalid_ssid_ = std::move(this_invalid_cids);
auto op2 = std::make_shared<GetCollectionIDsOperation>();
status = (*op2)(store_);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer::GetCollectionIDsOperation failed: " << status.message();
// TODO: Should be monitored
return;
}
auto aids = op2->GetIDs();
std::set<ID_TYPE> diff;
/* std::set_difference(alive_cids_.begin(), alive_cids_.end(), alive_cids.begin(), alive_cids.end(), */
/* std::inserter(diff, diff.begin())); */
std::set_difference(alive_cids_.begin(), alive_cids_.end(), aids.begin(), aids.end(),
std::inserter(diff, diff.begin()));
for (auto& cid : diff) {
......
......@@ -103,6 +103,7 @@ class Snapshots {
std::map<std::string, ID_TYPE> name_id_map_;
mutable std::shared_timed_mutex inactive_mtx_;
std::map<ID_TYPE, SnapshotHolderPtr> inactive_holders_;
std::set<ID_TYPE> invalid_ssid_;
StorePtr store_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册