From 944e41ad492a078f9338f6bf0ca1d5706f9adbc8 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 27 Aug 2020 16:12:28 +0800 Subject: [PATCH] refine segment reader/writer (#3484) Signed-off-by: groot --- core/src/codecs/BlockFormat.cpp | 55 ++++++--- core/src/codecs/BlockFormat.h | 9 +- core/src/codecs/DeletedDocsFormat.cpp | 47 ++++++-- core/src/codecs/DeletedDocsFormat.h | 7 +- core/src/codecs/IdBloomFilterFormat.cpp | 62 +++++++--- core/src/codecs/IdBloomFilterFormat.h | 7 +- core/src/codecs/StructuredIndexFormat.cpp | 62 ++++++---- core/src/codecs/StructuredIndexFormat.h | 5 +- core/src/codecs/VectorCompressFormat.cpp | 37 ++++-- core/src/codecs/VectorCompressFormat.h | 5 +- core/src/codecs/VectorIndexFormat.cpp | 87 ++++++++----- core/src/codecs/VectorIndexFormat.h | 15 +-- .../knowhere/index/vector_index/VecIndex.h | 1 - core/src/segment/SegmentReader.cpp | 23 ++-- core/src/segment/SegmentWriter.cpp | 114 ++++++------------ 15 files changed, 311 insertions(+), 225 deletions(-) diff --git a/core/src/codecs/BlockFormat.cpp b/core/src/codecs/BlockFormat.cpp index 7310828d..29d4fe7c 100644 --- a/core/src/codecs/BlockFormat.cpp +++ b/core/src/codecs/BlockFormat.cpp @@ -23,6 +23,7 @@ #include #include +#include "db/Utils.h" #include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" @@ -30,12 +31,12 @@ namespace milvus { namespace codec { -void +Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) { CHECK_MAGIC_VALID(fs_ptr, file_path); CHECK_SUM_VALID(fs_ptr, file_path); if (!fs_ptr->reader_ptr_->Open(file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -46,19 +47,21 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p raw->data_.resize(num_bytes); fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes); fs_ptr->reader_ptr_->Close(); + + return Status::OK(); } -void +Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes, engine::BinaryDataPtr& raw) { CHECK_MAGIC_VALID(fs_ptr, file_path); CHECK_SUM_VALID(fs_ptr, file_path); if (offset < 0 || num_bytes <= 0) { - THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path); + return Status(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path); } if (!fs_ptr->reader_ptr_->Open(file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -68,7 +71,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p offset += MAGIC_SIZE + HEADER_SIZE + sizeof(size_t); // Beginning of file is num_bytes if (offset + num_bytes > total_num_bytes) { - THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); + return Status(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); } raw = std::make_shared(); @@ -76,19 +79,21 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p fs_ptr->reader_ptr_->Seekg(offset); fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes); fs_ptr->reader_ptr_->Close(); + + return Status::OK(); } -void +Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges, engine::BinaryDataPtr& raw) { CHECK_MAGIC_VALID(fs_ptr, file_path); CHECK_SUM_VALID(fs_ptr, file_path); if (read_ranges.empty()) { - return; + return Status::OK(); } if (!fs_ptr->reader_ptr_->Open(file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -98,7 +103,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p int64_t total_bytes = 0; for (auto& range : read_ranges) { if (range.offset_ > total_num_bytes) { - THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); + return Status(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); } total_bytes += range.num_bytes_; } @@ -113,13 +118,15 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p poz += range.num_bytes_; } fs_ptr->reader_ptr_->Close(); + + return Status::OK(); } -void +Status BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const engine::BinaryDataPtr& raw) { if (raw == nullptr) { - return; + return Status::OK(); } // TODO: add extra info std::unordered_map maps; @@ -127,17 +134,27 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_ WRITE_HEADER(fs_ptr, file_path, maps); if (!fs_ptr->writer_ptr_->InOpen(file_path)) { - THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path); + return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path); } - fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + try { + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); - size_t num_bytes = raw->data_.size(); - fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t)); - fs_ptr->writer_ptr_->Write(raw->data_.data(), num_bytes); - fs_ptr->writer_ptr_->Close(); + size_t num_bytes = raw->data_.size(); + fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t)); + fs_ptr->writer_ptr_->Write(raw->data_.data(), num_bytes); + fs_ptr->writer_ptr_->Close(); + + WRITE_SUM(fs_ptr, file_path); + } catch (std::exception& ex) { + std::string err_msg = "Failed to write block data: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); + return Status(SERVER_WRITE_ERROR, err_msg); + } - WRITE_SUM(fs_ptr, file_path); + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/BlockFormat.h b/core/src/codecs/BlockFormat.h index 21901eef..0ed20510 100644 --- a/core/src/codecs/BlockFormat.h +++ b/core/src/codecs/BlockFormat.h @@ -24,6 +24,7 @@ #include "db/Types.h" #include "knowhere/common/BinarySet.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -41,18 +42,18 @@ class BlockFormat { public: BlockFormat() = default; - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes, engine::BinaryDataPtr& raw); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges, engine::BinaryDataPtr& raw); - void + Status Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const engine::BinaryDataPtr& raw); // No copy and move diff --git a/core/src/codecs/DeletedDocsFormat.cpp b/core/src/codecs/DeletedDocsFormat.cpp index 47e9f63e..dd50222a 100644 --- a/core/src/codecs/DeletedDocsFormat.cpp +++ b/core/src/codecs/DeletedDocsFormat.cpp @@ -26,6 +26,7 @@ #include #include +#include "db/Utils.h" #include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" @@ -41,7 +42,7 @@ DeletedDocsFormat::FilePostfix() { return str; } -void +Status DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::DeletedDocsPtr& deleted_docs) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; @@ -49,7 +50,7 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& CHECK_MAGIC_VALID(fs_ptr, full_file_path); CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -64,9 +65,11 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& fs_ptr->reader_ptr_->Close(); deleted_docs = std::make_shared(deleted_docs_list); + + return Status::OK(); } -void +Status DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const segment::DeletedDocsPtr& deleted_docs) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; @@ -84,7 +87,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& std::vector delete_ids; if (exists) { if (!fs_ptr->reader_ptr_->Open(temp_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path); } fs_ptr->reader_ptr_->Read(&old_num_bytes, sizeof(size_t)); delete_ids.resize(old_num_bytes / sizeof(engine::offset_t)); @@ -105,26 +108,42 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& WRITE_HEADER(fs_ptr, temp_path, maps); if (!fs_ptr->writer_ptr_->InOpen(temp_path)) { - THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path); + return Status(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path); } - fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); - fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t)); - fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes); - fs_ptr->writer_ptr_->Close(); - WRITE_SUM(fs_ptr, temp_path); + try { + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t)); + fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes); + fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, temp_path); + } catch (std::exception& ex) { + std::string err_msg = "Failed to write delete doc: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); + return Status(SERVER_WRITE_ERROR, err_msg); + } // Move temp file to delete file - std::experimental::filesystem::rename(temp_path, full_file_path); + try { + std::experimental::filesystem::rename(temp_path, full_file_path); + } catch (std::exception& ex) { + std::string msg = "Failed to rename file [" + temp_path + "] to [" + full_file_path + "]"; + LOG_SERVER_ERROR_ << msg; + return Status(SERVER_UNEXPECTED_ERROR, msg); + } + + return Status::OK(); } -void +Status DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; CHECK_MAGIC_VALID(fs_ptr, full_file_path); CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->writer_ptr_->Open(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path); + return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -133,6 +152,8 @@ DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::stri size = num_bytes / sizeof(engine::offset_t); fs_ptr->reader_ptr_->Close(); + + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index bf6bad4b..d94bd049 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -22,6 +22,7 @@ #include "segment/DeletedDocs.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -33,14 +34,14 @@ class DeletedDocsFormat { static std::string FilePostfix(); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::DeletedDocsPtr& deleted_docs); - void + Status Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const segment::DeletedDocsPtr& deleted_docs); - void + Status ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size); // No copy and move diff --git a/core/src/codecs/IdBloomFilterFormat.cpp b/core/src/codecs/IdBloomFilterFormat.cpp index 9e63ca85..e8764dfb 100644 --- a/core/src/codecs/IdBloomFilterFormat.cpp +++ b/core/src/codecs/IdBloomFilterFormat.cpp @@ -38,38 +38,62 @@ IdBloomFilterFormat::FilePostfix() { return str; } -void +Status IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; - scaling_bloom_t* bloom_filter = - new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str()); - fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr); - if (bloom_filter == nullptr) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path); + try { + const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; + scaling_bloom_t* bloom_filter = + new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str()); + fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr); + if (bloom_filter == nullptr) { + return Status(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path); + } + id_bloom_filter_ptr = std::make_shared(bloom_filter); + } catch (std::exception& ex) { + std::string msg = "Failed to read bloom filter file, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + return Status(SERVER_UNEXPECTED_ERROR, msg); } - id_bloom_filter_ptr = std::make_shared(bloom_filter); + + return Status::OK(); } -void +Status IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; - if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path); + try { + const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; + if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { + return Status(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path); + } + } catch (std::exception& ex) { + std::string msg = "Failed to write bloom filter file, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + return Status(SERVER_UNEXPECTED_ERROR, msg); } + + return Status::OK(); } -void +Status IdBloomFilterFormat::Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; - scaling_bloom_t* bloom_filter = - new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str()); - if (bloom_filter == nullptr) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path); + try { + const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; + scaling_bloom_t* bloom_filter = + new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str()); + if (bloom_filter == nullptr) { + return Status(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path); + } + id_bloom_filter_ptr = std::make_shared(bloom_filter); + } catch (std::exception& ex) { + std::string msg = "Failed to create bloom filter file, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + return Status(SERVER_UNEXPECTED_ERROR, msg); } - id_bloom_filter_ptr = std::make_shared(bloom_filter); + + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index f424ee21..9c0f83e4 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -22,6 +22,7 @@ #include "segment/IdBloomFilter.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -33,15 +34,15 @@ class IdBloomFilterFormat { static std::string FilePostfix(); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::IdBloomFilterPtr& id_bloom_filter_ptr); - void + Status Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const segment::IdBloomFilterPtr& id_bloom_filter_ptr); - void + Status Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::IdBloomFilterPtr& id_bloom_filter_ptr); diff --git a/core/src/codecs/StructuredIndexFormat.cpp b/core/src/codecs/StructuredIndexFormat.cpp index ce4748c8..53c15f9d 100644 --- a/core/src/codecs/StructuredIndexFormat.cpp +++ b/core/src/codecs/StructuredIndexFormat.cpp @@ -24,6 +24,7 @@ #include #include "db/Types.h" +#include "db/Utils.h" #include "knowhere/index/structured_index/StructuredIndexSort.h" #include "storage/ExtraFileInfo.h" @@ -78,7 +79,7 @@ StructuredIndexFormat::CreateStructuredIndex(const engine::DataType data_type) { return index; } -void +Status StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::IndexPtr& index) { milvus::TimeRecorder recorder("StructuredIndexFormat::Read"); @@ -88,11 +89,11 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s CHECK_MAGIC_VALID(fs_ptr, full_file_path); CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); } int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE; if (length <= 0) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path); + return Status(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path); } size_t rp = MAGIC_SIZE + HEADER_SIZE; @@ -138,9 +139,11 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s auto attr_type = static_cast(data_type); index = CreateStructuredIndex(attr_type); index->Load(load_data_list); + + return Status::OK(); } -void +Status StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::DataType data_type, const knowhere::IndexPtr& index) { milvus::TimeRecorder recorder("StructuredIndexFormat::Write"); @@ -154,29 +157,40 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const auto binaryset = index->Serialize(knowhere::Config()); if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); - } - fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); - fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type)); - - for (auto& iter : binaryset.binary_map_) { - auto meta = iter.first.c_str(); - size_t meta_length = iter.first.length(); - fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length)); - fs_ptr->writer_ptr_->Write(meta, meta_length); - - auto binary = iter.second; - int64_t binary_length = binary->size; - fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length)); - fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); } - fs_ptr->writer_ptr_->Close(); - WRITE_SUM(fs_ptr, full_file_path); + try { + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type)); - double span = recorder.RecordSection("End"); - double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s"; + for (auto& iter : binaryset.binary_map_) { + auto meta = iter.first.c_str(); + size_t meta_length = iter.first.length(); + fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length)); + fs_ptr->writer_ptr_->Write(meta, meta_length); + + auto binary = iter.second; + int64_t binary_length = binary->size; + fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length)); + fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length); + } + + fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, full_file_path); + + double span = recorder.RecordSection("End"); + double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; + LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s"; + } catch (std::exception& ex) { + std::string err_msg = "Failed to write structured index: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); + return Status(SERVER_WRITE_ERROR, err_msg); + } + + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/StructuredIndexFormat.h b/core/src/codecs/StructuredIndexFormat.h index 2764af82..d5d62f8a 100644 --- a/core/src/codecs/StructuredIndexFormat.h +++ b/core/src/codecs/StructuredIndexFormat.h @@ -24,6 +24,7 @@ #include "db/Types.h" #include "knowhere/index/Index.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -35,10 +36,10 @@ class StructuredIndexFormat { static std::string FilePostfix(); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::IndexPtr& index); - void + Status Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::DataType data_type, const knowhere::IndexPtr& index); diff --git a/core/src/codecs/VectorCompressFormat.cpp b/core/src/codecs/VectorCompressFormat.cpp index 87f1309d..83dd8d5a 100644 --- a/core/src/codecs/VectorCompressFormat.cpp +++ b/core/src/codecs/VectorCompressFormat.cpp @@ -20,6 +20,7 @@ #include #include "codecs/VectorCompressFormat.h" +#include "db/Utils.h" #include "knowhere/common/BinarySet.h" #include "storage/ExtraFileInfo.h" #include "utils/Exception.h" @@ -37,7 +38,7 @@ VectorCompressFormat::FilePostfix() { return str; } -void +Status VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& compress) { milvus::TimeRecorder recorder("VectorCompressFormat::Read"); @@ -46,12 +47,12 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin CHECK_MAGIC_VALID(fs_ptr, full_file_path); CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path); } int64_t length = fs_ptr->reader_ptr_->Length() - MAGIC_SIZE - HEADER_SIZE - SUM_SIZE; if (length <= 0) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path); + return Status(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path); } compress->data = std::shared_ptr(new uint8_t[length]); @@ -64,9 +65,11 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin double span = recorder.RecordSection("End"); double rate = length * 1000000.0 / span / 1024 / 1024; LOG_ENGINE_DEBUG_ << "VectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s"; + + return Status::OK(); } -void +Status VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::BinaryPtr& compress) { milvus::TimeRecorder recorder("VectorCompressFormat::Write"); @@ -77,17 +80,27 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri WRITE_MAGIC(fs_ptr, full_file_path); WRITE_HEADER(fs_ptr, full_file_path, maps); if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path); } - fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); - fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size); - fs_ptr->writer_ptr_->Close(); - WRITE_SUM(fs_ptr, full_file_path); + try { + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size); + fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, full_file_path); + + double span = recorder.RecordSection("End"); + double rate = compress->size * 1000000.0 / span / 1024 / 1024; + LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s"; + } catch (std::exception& ex) { + std::string err_msg = "Failed to write compress data: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); + return Status(SERVER_WRITE_ERROR, err_msg); + } - double span = recorder.RecordSection("End"); - double rate = compress->size * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s"; + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/VectorCompressFormat.h b/core/src/codecs/VectorCompressFormat.h index 8f4a6c9d..0cc44d0a 100644 --- a/core/src/codecs/VectorCompressFormat.h +++ b/core/src/codecs/VectorCompressFormat.h @@ -22,6 +22,7 @@ #include "knowhere/common/BinarySet.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -33,10 +34,10 @@ class VectorCompressFormat { static std::string FilePostfix(); - void + Status Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& compress); - void + Status Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::BinaryPtr& compress); // No copy and move diff --git a/core/src/codecs/VectorIndexFormat.cpp b/core/src/codecs/VectorIndexFormat.cpp index 8b6e541b..99a9cde2 100644 --- a/core/src/codecs/VectorIndexFormat.cpp +++ b/core/src/codecs/VectorIndexFormat.cpp @@ -21,6 +21,7 @@ #include "codecs/Codec.h" #include "codecs/VectorIndexFormat.h" +#include "db/Utils.h" #include "knowhere/common/BinarySet.h" #include "knowhere/index/vector_index/VecIndex.h" #include "knowhere/index/vector_index/VecIndexFactory.h" @@ -40,7 +41,7 @@ VectorIndexFormat::FilePostfix() { return str; } -void +Status VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data) { milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw"); @@ -48,7 +49,7 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin CHECK_SUM_VALID(fs_ptr, file_path); if (!fs_ptr->reader_ptr_->Open(file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path); } fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); @@ -67,9 +68,11 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin double span = recorder.RecordSection("End"); double rate = num_bytes * 1000000.0 / span / 1024 / 1024; LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << file_path << ") rate " << rate << "MB/s"; + + return Status::OK(); } -void +Status VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinarySet& data) { milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex"); @@ -78,12 +81,12 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str CHECK_MAGIC_VALID(fs_ptr, full_file_path); CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); } int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE; if (length <= 0) { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path); + return Status(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path); } int64_t rp = MAGIC_SIZE + HEADER_SIZE; @@ -120,28 +123,32 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str double span = recorder.RecordSection("End"); double rate = length * 1000000.0 / span / 1024 / 1024; LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << full_file_path << ") rate " << rate << "MB/s"; + + return Status::OK(); } -void +Status VectorIndexFormat::ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data) { auto& ss_codec = codec::Codec::instance(); - ss_codec.GetVectorCompressFormat()->Read(fs_ptr, file_path, data); + return ss_codec.GetVectorCompressFormat()->Read(fs_ptr, file_path, data); } -void +Status VectorIndexFormat::ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data) { data = std::make_shared(); if (raw == nullptr) { - return; + return Status::OK(); } data->size = raw->Size(); data->data = std::shared_ptr(new uint8_t[data->size], std::default_delete()); memcpy(data->data.get(), raw->data_.data(), data->size); + + return Status::OK(); } -void +Status VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::BinarySet& index_data, knowhere::BinaryPtr& raw_data, knowhere::BinaryPtr& compress_data, knowhere::VecIndexPtr& index) { @@ -169,11 +176,13 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar index->UpdateIndexSize(); LOG_ENGINE_DEBUG_ << "index file size " << length << " index size " << index->IndexSize(); } else { - THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to create vector index"); + return Status(SERVER_UNEXPECTED_ERROR, "Fail to create vector index"); } + + return Status::OK(); } -void +Status VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index) { milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex"); @@ -186,31 +195,45 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st auto binaryset = index->Serialize(knowhere::Config()); if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { - THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); + return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); } - fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); - for (auto& iter : binaryset.binary_map_) { - auto meta = iter.first.c_str(); - size_t meta_length = iter.first.length(); - fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length)); - fs_ptr->writer_ptr_->Write(meta, meta_length); - - auto binary = iter.second; - int64_t binary_length = binary->size; - fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length)); - fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length); - } - fs_ptr->writer_ptr_->Close(); + try { + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + for (auto& iter : binaryset.binary_map_) { + if (iter.first == RAW_DATA || iter.first == QUANTIZATION_DATA) { + continue; // the two kinds of data will be written into another file + } + + auto meta = iter.first.c_str(); + size_t meta_length = iter.first.length(); + fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length)); + fs_ptr->writer_ptr_->Write(meta, meta_length); + + auto binary = iter.second; + int64_t binary_length = binary->size; + fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length)); + fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length); + } + fs_ptr->writer_ptr_->Close(); - WRITE_SUM(fs_ptr, full_file_path); + WRITE_SUM(fs_ptr, full_file_path); - double span = recorder.RecordSection("End"); - double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s"; + double span = recorder.RecordSection("End"); + double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; + LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s"; + } catch (std::exception& ex) { + std::string err_msg = "Failed to write vector index data: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << err_msg; + + engine::utils::SendExitSignal(); + return Status(SERVER_WRITE_ERROR, err_msg); + } + + return Status::OK(); } -void +Status VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index) { milvus::TimeRecorder recorder("VectorIndexFormat::WriteCompress"); @@ -222,6 +245,8 @@ VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std: auto& ss_codec = codec::Codec::instance(); ss_codec.GetVectorCompressFormat()->Write(fs_ptr, file_path, sq8_data); } + + return Status::OK(); } } // namespace codec diff --git a/core/src/codecs/VectorIndexFormat.h b/core/src/codecs/VectorIndexFormat.h index 317bddb0..4893a3c1 100644 --- a/core/src/codecs/VectorIndexFormat.h +++ b/core/src/codecs/VectorIndexFormat.h @@ -23,6 +23,7 @@ #include "knowhere/index/vector_index/VecIndex.h" #include "storage/FSHandler.h" +#include "utils/Status.h" namespace milvus { namespace codec { @@ -34,26 +35,26 @@ class VectorIndexFormat { static std::string FilePostfix(); - void + Status ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data); - void + Status ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinarySet& data); - void + Status ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data); - void + Status ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data); - void + Status ConstructIndex(const std::string& index_name, knowhere::BinarySet& index_data, knowhere::BinaryPtr& raw_data, knowhere::BinaryPtr& compress_data, knowhere::VecIndexPtr& index); - void + Status WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index); - void + Status WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index); diff --git a/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h b/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h index 8f48831f..4cde0ff0 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h @@ -25,7 +25,6 @@ namespace milvus { namespace knowhere { -#define INDEX_DATA "INDEX_DATA" #define RAW_DATA "RAW_DATA" #define QUANTIZATION_DATA "QUANTIZATION_DATA" diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index b24c9260..0d111d95 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -122,7 +122,7 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); if (data_obj == nullptr) { auto& ss_codec = codec::Codec::instance(); - ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw); + STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw)); if (to_cache) { cache::CpuCacheMgr::GetInstance().InsertItem(file_path, raw); // put into cache @@ -180,7 +180,7 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vectorRead(fs_ptr_, file_path, ranges, raw); + STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, ranges, raw)); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; @@ -338,7 +338,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex return Status::OK(); } - ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data)); // for some kinds index(IVF), read raw file auto index_type = index_visitor->GetElement()->GetTypeName(); @@ -346,11 +346,11 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex engine::BinaryDataPtr fixed_data; auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data); if (status.ok()) { - ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data)); } else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) { auto file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); - ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data)); } } @@ -359,11 +359,12 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS)) { auto file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); - ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data)); } } - ss_codec.GetVectorIndexFormat()->ConstructIndex(index_type, index_data, raw_data, compress_data, index_ptr); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ConstructIndex(index_type, index_data, raw_data, compress_data, + index_ptr)); index_ptr->SetUids(uids); index_ptr->SetBlacklist(concurrent_bitset_ptr); @@ -407,7 +408,7 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde // if the data is in cache, no need to read file auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); if (data_obj == nullptr) { - ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr); + STATUS_CHECK(ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr)); cache::CpuCacheMgr::GetInstance().InsertItem(file_path, index_ptr); // put into cache } else { index_ptr = std::static_pointer_cast(data_obj); @@ -470,7 +471,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); if (data_obj == nullptr) { auto& ss_codec = codec::Codec::instance(); - ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr); + STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr)); } else { id_bloom_filter_ptr = std::static_pointer_cast(data_obj); } @@ -508,7 +509,7 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); if (data_obj == nullptr) { auto& ss_codec = codec::Codec::instance(); - ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr); + STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr)); } else { deleted_docs_ptr = std::static_pointer_cast(data_obj); } @@ -544,7 +545,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) { } auto& ss_codec = codec::Codec::instance(); - ss_codec.GetDeletedDocsFormat()->ReadSize(fs_ptr_, file_path, size); + STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->ReadSize(fs_ptr_, file_path, size)); } catch (std::exception& e) { std::string err_msg = "Failed to read deleted docs size: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 60c88482..1011a7ec 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -92,16 +92,9 @@ SegmentWriter::Serialize() { Status SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) { - try { - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw); - } catch (std::exception& e) { - std::string err_msg = "Failed to write field: " + std::string(e.what()); - LOG_ENGINE_ERROR_ << err_msg; + auto& ss_codec = codec::Codec::instance(); + STATUS_CHECK(ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw)); - engine::utils::SendExitSignal(); - return Status(SERVER_WRITE_ERROR, err_msg); - } return Status::OK(); } @@ -137,49 +130,41 @@ SegmentWriter::WriteFields() { Status SegmentWriter::WriteBloomFilter() { - try { - TimeRecorder recorder("SegmentWriter::WriteBloomFilter"); - - engine::BinaryDataPtr uid_data; - auto status = segment_ptr_->GetFixedFieldData(engine::FIELD_UID, uid_data); - if (!status.ok()) { - return status; - } + TimeRecorder recorder("SegmentWriter::WriteBloomFilter"); - auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); - auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID); - auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); - if (uid_blf_visitor && uid_blf_visitor->GetFile()) { - auto segment_file = uid_blf_visitor->GetFile(); - std::string file_path = - engine::snapshot::GetResPath(dir_collections_, segment_file); + engine::BinaryDataPtr uid_data; + auto status = segment_ptr_->GetFixedFieldData(engine::FIELD_UID, uid_data); + if (!status.ok()) { + return status; + } - auto& ss_codec = codec::Codec::instance(); - segment::IdBloomFilterPtr bloom_filter_ptr; - ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr); + auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); + auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID); + auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); + if (uid_blf_visitor && uid_blf_visitor->GetFile()) { + auto segment_file = uid_blf_visitor->GetFile(); + std::string file_path = + engine::snapshot::GetResPath(dir_collections_, segment_file); - auto uids = reinterpret_cast(uid_data->data_.data()); - int64_t row_count = segment_ptr_->GetRowCount(); - for (int64_t i = 0; i < row_count; i++) { - bloom_filter_ptr->Add(uids[i]); - } - segment_ptr_->SetBloomFilter(bloom_filter_ptr); + auto& ss_codec = codec::Codec::instance(); + segment::IdBloomFilterPtr bloom_filter_ptr; + STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr)); - recorder.RecordSection("Initialize bloom filter"); + auto uids = reinterpret_cast(uid_data->data_.data()); + int64_t row_count = segment_ptr_->GetRowCount(); + for (int64_t i = 0; i < row_count; i++) { + bloom_filter_ptr->Add(uids[i]); + } + segment_ptr_->SetBloomFilter(bloom_filter_ptr); - STATUS_CHECK(WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter())); + recorder.RecordSection("Initialize bloom filter"); - auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix()); - segment_file->SetSize(file_size); - } else { - return Status(DB_ERROR, "Bloom filter element missed in snapshot"); - } - } catch (std::exception& e) { - std::string err_msg = "Failed to write vectors: " + std::string(e.what()); - LOG_ENGINE_ERROR_ << err_msg; + STATUS_CHECK(WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter())); - engine::utils::SendExitSignal(); - return Status(SERVER_WRITE_ERROR, err_msg); + auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix()); + segment_file->SetSize(file_size); + } else { + return Status(DB_ERROR, "Bloom filter element missed in snapshot"); } return Status::OK(); @@ -188,11 +173,7 @@ SegmentWriter::WriteBloomFilter() { Status SegmentWriter::CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr& bloom_filter_ptr) { auto& ss_codec = codec::Codec::instance(); - try { - ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr); - } catch (std::exception& er) { - return Status(DB_ERROR, "Create a new bloom filter fail: " + std::string(er.what())); - } + STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr)); return Status::OK(); } @@ -203,18 +184,10 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte return Status(DB_ERROR, "WriteBloomFilter: null pointer"); } - try { - TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path); - - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr); - } catch (std::exception& e) { - std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); - LOG_ENGINE_ERROR_ << err_msg; + TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path); - engine::utils::SendExitSignal(); - return Status(SERVER_WRITE_ERROR, err_msg); - } + auto& ss_codec = codec::Codec::instance(); + STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr)); return Status::OK(); } @@ -246,18 +219,11 @@ SegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsP return Status::OK(); } - try { - TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path); + TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path); - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs); - } catch (std::exception& e) { - std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); - LOG_ENGINE_ERROR_ << err_msg; + auto& ss_codec = codec::Codec::instance(); + STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs)); - engine::utils::SendExitSignal(); - return Status(SERVER_WRITE_ERROR, err_msg); - } return Status::OK(); } @@ -392,7 +358,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) { auto segment_file = element_visitor->GetFile(); std::string file_path = engine::snapshot::GetResPath(dir_collections_, segment_file); - ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index)); auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::VectorIndexFormat::FilePostfix()); segment_file->SetSize(file_size); @@ -406,7 +372,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) { auto segment_file = element_visitor->GetFile(); std::string file_path = engine::snapshot::GetResPath(dir_collections_, segment_file); - ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index); + STATUS_CHECK(ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index)); auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::VectorCompressFormat::FilePostfix()); @@ -454,7 +420,7 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) { auto segment_file = element_visitor->GetFile(); std::string file_path = engine::snapshot::GetResPath(dir_collections_, segment_file); - ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index); + STATUS_CHECK(ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index)); auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::StructuredIndexFormat::FilePostfix()); segment_file->SetSize(file_size); -- GitLab