未验证 提交 944e41ad 编写于 作者: G groot 提交者: GitHub

refine segment reader/writer (#3484)

Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 6b4a2afa
......@@ -23,6 +23,7 @@
#include <memory>
#include <unordered_map>
#include "db/Utils.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -30,12 +31,12 @@
namespace milvus {
namespace codec {
void
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -46,19 +47,21 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
return Status::OK();
}
void
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (offset < 0 || num_bytes <= 0) {
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
return Status(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
}
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -68,7 +71,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
offset += MAGIC_SIZE + HEADER_SIZE + sizeof(size_t); // Beginning of file is num_bytes
if (offset + num_bytes > total_num_bytes) {
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
return Status(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
}
raw = std::make_shared<engine::BinaryData>();
......@@ -76,19 +79,21 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
return Status::OK();
}
void
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (read_ranges.empty()) {
return;
return Status::OK();
}
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -98,7 +103,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
int64_t total_bytes = 0;
for (auto& range : read_ranges) {
if (range.offset_ > total_num_bytes) {
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
return Status(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
}
total_bytes += range.num_bytes_;
}
......@@ -113,13 +118,15 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
poz += range.num_bytes_;
}
fs_ptr->reader_ptr_->Close();
return Status::OK();
}
void
Status
BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const engine::BinaryDataPtr& raw) {
if (raw == nullptr) {
return;
return Status::OK();
}
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
......@@ -127,17 +134,27 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_
WRITE_HEADER(fs_ptr, file_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(raw->data_.data(), num_bytes);
fs_ptr->writer_ptr_->Close();
size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(raw->data_.data(), num_bytes);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, file_path);
} catch (std::exception& ex) {
std::string err_msg = "Failed to write block data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
WRITE_SUM(fs_ptr, file_path);
return Status::OK();
}
} // namespace codec
......
......@@ -24,6 +24,7 @@
#include "db/Types.h"
#include "knowhere/common/BinarySet.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -41,18 +42,18 @@ class BlockFormat {
public:
BlockFormat() = default;
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw);
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw);
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
engine::BinaryDataPtr& raw);
void
Status
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const engine::BinaryDataPtr& raw);
// No copy and move
......
......@@ -26,6 +26,7 @@
#include <unordered_map>
#include <vector>
#include "db/Utils.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -41,7 +42,7 @@ DeletedDocsFormat::FilePostfix() {
return str;
}
void
Status
DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
......@@ -49,7 +50,7 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -64,9 +65,11 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
fs_ptr->reader_ptr_->Close();
deleted_docs = std::make_shared<segment::DeletedDocs>(deleted_docs_list);
return Status::OK();
}
void
Status
DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
......@@ -84,7 +87,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
std::vector<engine::offset_t> delete_ids;
if (exists) {
if (!fs_ptr->reader_ptr_->Open(temp_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path);
}
fs_ptr->reader_ptr_->Read(&old_num_bytes, sizeof(size_t));
delete_ids.resize(old_num_bytes / sizeof(engine::offset_t));
......@@ -105,26 +108,42 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
WRITE_HEADER(fs_ptr, temp_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(temp_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path);
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, temp_path);
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, temp_path);
} catch (std::exception& ex) {
std::string err_msg = "Failed to write delete doc: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
// Move temp file to delete file
std::experimental::filesystem::rename(temp_path, full_file_path);
try {
std::experimental::filesystem::rename(temp_path, full_file_path);
} catch (std::exception& ex) {
std::string msg = "Failed to rename file [" + temp_path + "] to [" + full_file_path + "]";
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
return Status::OK();
}
void
Status
DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path);
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -133,6 +152,8 @@ DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::stri
size = num_bytes / sizeof(engine::offset_t);
fs_ptr->reader_ptr_->Close();
return Status::OK();
}
} // namespace codec
......
......@@ -22,6 +22,7 @@
#include "segment/DeletedDocs.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -33,14 +34,14 @@ class DeletedDocsFormat {
static std::string
FilePostfix();
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::DeletedDocsPtr& deleted_docs);
void
Status
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::DeletedDocsPtr& deleted_docs);
void
Status
ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size);
// No copy and move
......
......@@ -38,38 +38,62 @@ IdBloomFilterFormat::FilePostfix() {
return str;
}
void
Status
IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
if (bloom_filter == nullptr) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path);
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
if (bloom_filter == nullptr) {
return Status(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
} catch (std::exception& ex) {
std::string msg = "Failed to read bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
return Status::OK();
}
void
Status
IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path);
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
return Status(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path);
}
} catch (std::exception& ex) {
std::string msg = "Failed to write bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
return Status::OK();
}
void
Status
IdBloomFilterFormat::Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
if (bloom_filter == nullptr) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path);
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
scaling_bloom_t* bloom_filter =
new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
if (bloom_filter == nullptr) {
return Status(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
} catch (std::exception& ex) {
std::string msg = "Failed to create bloom filter file, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
return Status::OK();
}
} // namespace codec
......
......@@ -22,6 +22,7 @@
#include "segment/IdBloomFilter.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -33,15 +34,15 @@ class IdBloomFilterFormat {
static std::string
FilePostfix();
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr);
void
Status
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::IdBloomFilterPtr& id_bloom_filter_ptr);
void
Status
Create(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::IdBloomFilterPtr& id_bloom_filter_ptr);
......
......@@ -24,6 +24,7 @@
#include <unordered_map>
#include "db/Types.h"
#include "db/Utils.h"
#include "knowhere/index/structured_index/StructuredIndexSort.h"
#include "storage/ExtraFileInfo.h"
......@@ -78,7 +79,7 @@ StructuredIndexFormat::CreateStructuredIndex(const engine::DataType data_type) {
return index;
}
void
Status
StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("StructuredIndexFormat::Read");
......@@ -88,11 +89,11 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path);
return Status(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path);
}
size_t rp = MAGIC_SIZE + HEADER_SIZE;
......@@ -138,9 +139,11 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
auto attr_type = static_cast<engine::DataType>(data_type);
index = CreateStructuredIndex(attr_type);
index->Load(load_data_list);
return Status::OK();
}
void
Status
StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
engine::DataType data_type, const knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("StructuredIndexFormat::Write");
......@@ -154,29 +157,40 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type));
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type));
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
}
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write structured index: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
} // namespace codec
......
......@@ -24,6 +24,7 @@
#include "db/Types.h"
#include "knowhere/index/Index.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -35,10 +36,10 @@ class StructuredIndexFormat {
static std::string
FilePostfix();
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::IndexPtr& index);
void
Status
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::DataType data_type,
const knowhere::IndexPtr& index);
......
......@@ -20,6 +20,7 @@
#include <unordered_map>
#include "codecs/VectorCompressFormat.h"
#include "db/Utils.h"
#include "knowhere/common/BinarySet.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
......@@ -37,7 +38,7 @@ VectorCompressFormat::FilePostfix() {
return str;
}
void
Status
VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("VectorCompressFormat::Read");
......@@ -46,12 +47,12 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->Length() - MAGIC_SIZE - HEADER_SIZE - SUM_SIZE;
if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path);
return Status(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path);
}
compress->data = std::shared_ptr<uint8_t[]>(new uint8_t[length]);
......@@ -64,9 +65,11 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s";
return Status::OK();
}
void
Status
VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("VectorCompressFormat::Write");
......@@ -77,17 +80,27 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
WRITE_MAGIC(fs_ptr, full_file_path);
WRITE_HEADER(fs_ptr, full_file_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write compress data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s";
return Status::OK();
}
} // namespace codec
......
......@@ -22,6 +22,7 @@
#include "knowhere/common/BinarySet.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -33,10 +34,10 @@ class VectorCompressFormat {
static std::string
FilePostfix();
void
Status
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& compress);
void
Status
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::BinaryPtr& compress);
// No copy and move
......
......@@ -21,6 +21,7 @@
#include "codecs/Codec.h"
#include "codecs/VectorIndexFormat.h"
#include "db/Utils.h"
#include "knowhere/common/BinarySet.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "knowhere/index/vector_index/VecIndexFactory.h"
......@@ -40,7 +41,7 @@ VectorIndexFormat::FilePostfix() {
return str;
}
void
Status
VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& data) {
milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw");
......@@ -48,7 +49,7 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
......@@ -67,9 +68,11 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin
double span = recorder.RecordSection("End");
double rate = num_bytes * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << file_path << ") rate " << rate << "MB/s";
return Status::OK();
}
void
Status
VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinarySet& data) {
milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex");
......@@ -78,12 +81,12 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path);
return Status(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path);
}
int64_t rp = MAGIC_SIZE + HEADER_SIZE;
......@@ -120,28 +123,32 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << full_file_path << ") rate " << rate << "MB/s";
return Status::OK();
}
void
Status
VectorIndexFormat::ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& data) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetVectorCompressFormat()->Read(fs_ptr, file_path, data);
return ss_codec.GetVectorCompressFormat()->Read(fs_ptr, file_path, data);
}
void
Status
VectorIndexFormat::ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data) {
data = std::make_shared<knowhere::Binary>();
if (raw == nullptr) {
return;
return Status::OK();
}
data->size = raw->Size();
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[data->size], std::default_delete<uint8_t[]>());
memcpy(data->data.get(), raw->data_.data(), data->size);
return Status::OK();
}
void
Status
VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::BinarySet& index_data,
knowhere::BinaryPtr& raw_data, knowhere::BinaryPtr& compress_data,
knowhere::VecIndexPtr& index) {
......@@ -169,11 +176,13 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar
index->UpdateIndexSize();
LOG_ENGINE_DEBUG_ << "index file size " << length << " index size " << index->IndexSize();
} else {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to create vector index");
return Status(SERVER_UNEXPECTED_ERROR, "Fail to create vector index");
}
return Status::OK();
}
void
Status
VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index) {
milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex");
......@@ -186,31 +195,45 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
}
fs_ptr->writer_ptr_->Close();
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
for (auto& iter : binaryset.binary_map_) {
if (iter.first == RAW_DATA || iter.first == QUANTIZATION_DATA) {
continue; // the two kinds of data will be written into another file
}
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
}
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write vector index data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
void
Status
VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index) {
milvus::TimeRecorder recorder("VectorIndexFormat::WriteCompress");
......@@ -222,6 +245,8 @@ VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std:
auto& ss_codec = codec::Codec::instance();
ss_codec.GetVectorCompressFormat()->Write(fs_ptr, file_path, sq8_data);
}
return Status::OK();
}
} // namespace codec
......
......@@ -23,6 +23,7 @@
#include "knowhere/index/vector_index/VecIndex.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
namespace codec {
......@@ -34,26 +35,26 @@ class VectorIndexFormat {
static std::string
FilePostfix();
void
Status
ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data);
void
Status
ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinarySet& data);
void
Status
ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data);
void
Status
ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data);
void
Status
ConstructIndex(const std::string& index_name, knowhere::BinarySet& index_data, knowhere::BinaryPtr& raw_data,
knowhere::BinaryPtr& compress_data, knowhere::VecIndexPtr& index);
void
Status
WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index);
void
Status
WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index);
......
......@@ -25,7 +25,6 @@
namespace milvus {
namespace knowhere {
#define INDEX_DATA "INDEX_DATA"
#define RAW_DATA "RAW_DATA"
#define QUANTIZATION_DATA "QUANTIZATION_DATA"
......
......@@ -122,7 +122,7 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw);
STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw));
if (to_cache) {
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, raw); // put into cache
......@@ -180,7 +180,7 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vector<int
ranges.push_back(codec::ReadRange(offset * field_width, field_width));
}
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, ranges, raw);
STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, ranges, raw));
} catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -338,7 +338,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
return Status::OK();
}
ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data));
// for some kinds index(IVF), read raw file
auto index_type = index_visitor->GetElement()->GetTypeName();
......@@ -346,11 +346,11 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
engine::BinaryDataPtr fixed_data;
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
if (status.ok()) {
ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data));
} else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data));
}
}
......@@ -359,11 +359,12 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data));
}
}
ss_codec.GetVectorIndexFormat()->ConstructIndex(index_type, index_data, raw_data, compress_data, index_ptr);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ConstructIndex(index_type, index_data, raw_data, compress_data,
index_ptr));
index_ptr->SetUids(uids);
index_ptr->SetBlacklist(concurrent_bitset_ptr);
......@@ -407,7 +408,7 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr);
STATUS_CHECK(ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr));
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, index_ptr); // put into cache
} else {
index_ptr = std::static_pointer_cast<knowhere::Index>(data_obj);
......@@ -470,7 +471,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr);
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr));
} else {
id_bloom_filter_ptr = std::static_pointer_cast<segment::IdBloomFilter>(data_obj);
}
......@@ -508,7 +509,7 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr);
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr));
} else {
deleted_docs_ptr = std::static_pointer_cast<segment::DeletedDocs>(data_obj);
}
......@@ -544,7 +545,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) {
}
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->ReadSize(fs_ptr_, file_path, size);
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->ReadSize(fs_ptr_, file_path, size));
} catch (std::exception& e) {
std::string err_msg = "Failed to read deleted docs size: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......
......@@ -92,16 +92,9 @@ SegmentWriter::Serialize() {
Status
SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) {
try {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw);
} catch (std::exception& e) {
std::string err_msg = "Failed to write field: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw));
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -137,49 +130,41 @@ SegmentWriter::WriteFields() {
Status
SegmentWriter::WriteBloomFilter() {
try {
TimeRecorder recorder("SegmentWriter::WriteBloomFilter");
engine::BinaryDataPtr uid_data;
auto status = segment_ptr_->GetFixedFieldData(engine::FIELD_UID, uid_data);
if (!status.ok()) {
return status;
}
TimeRecorder recorder("SegmentWriter::WriteBloomFilter");
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID);
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
if (uid_blf_visitor && uid_blf_visitor->GetFile()) {
auto segment_file = uid_blf_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
engine::BinaryDataPtr uid_data;
auto status = segment_ptr_->GetFixedFieldData(engine::FIELD_UID, uid_data);
if (!status.ok()) {
return status;
}
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr);
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID);
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
if (uid_blf_visitor && uid_blf_visitor->GetFile()) {
auto segment_file = uid_blf_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
auto uids = reinterpret_cast<int64_t*>(uid_data->data_.data());
int64_t row_count = segment_ptr_->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
}
segment_ptr_->SetBloomFilter(bloom_filter_ptr);
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr));
recorder.RecordSection("Initialize bloom filter");
auto uids = reinterpret_cast<int64_t*>(uid_data->data_.data());
int64_t row_count = segment_ptr_->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
}
segment_ptr_->SetBloomFilter(bloom_filter_ptr);
STATUS_CHECK(WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter()));
recorder.RecordSection("Initialize bloom filter");
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix());
segment_file->SetSize(file_size);
} else {
return Status(DB_ERROR, "Bloom filter element missed in snapshot");
}
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
STATUS_CHECK(WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter()));
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix());
segment_file->SetSize(file_size);
} else {
return Status(DB_ERROR, "Bloom filter element missed in snapshot");
}
return Status::OK();
......@@ -188,11 +173,7 @@ SegmentWriter::WriteBloomFilter() {
Status
SegmentWriter::CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr& bloom_filter_ptr) {
auto& ss_codec = codec::Codec::instance();
try {
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr);
} catch (std::exception& er) {
return Status(DB_ERROR, "Create a new bloom filter fail: " + std::string(er.what()));
}
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr));
return Status::OK();
}
......@@ -203,18 +184,10 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
return Status(DB_ERROR, "WriteBloomFilter: null pointer");
}
try {
TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path);
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path);
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr));
return Status::OK();
}
......@@ -246,18 +219,11 @@ SegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsP
return Status::OK();
}
try {
TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path);
TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path);
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs);
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs));
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -392,7 +358,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index));
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::VectorIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
......@@ -406,7 +372,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index);
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index));
auto file_size =
milvus::CommonUtil::GetFileSize(file_path + codec::VectorCompressFormat::FilePostfix());
......@@ -454,7 +420,7 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index);
STATUS_CHECK(ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index));
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::StructuredIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册