未验证 提交 c7be9b96 编写于 作者: G groot 提交者: GitHub

improve GetEntityByID performance (#3932)

* improve GetEntityByID performance
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* reduce check sum
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine log
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 fe8060f4
......@@ -27,12 +27,14 @@
#include "db/Utils.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace codec {
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
......@@ -54,6 +56,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw) {
milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path);
if (offset < 0 || num_bytes <= 0) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
}
......@@ -62,7 +65,9 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
// no need to check sum, check sum read whole file data, poor performance
// CHECK_SUM_VALID(fs_ptr);
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t total_num_bytes = stol(map.at("size"));
......@@ -84,6 +89,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
engine::BinaryDataPtr& raw) {
milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path);
if (read_ranges.empty()) {
return Status::OK();
}
......@@ -92,7 +98,9 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
// no need to check sum, check sum read whole file data, poor performance
// CHECK_SUM_VALID(fs_ptr);
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t total_num_bytes = stol(map.at("size"));
......@@ -127,6 +135,8 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_
return Status::OK();
}
milvus::TimeRecorderAuto recorder("BlockFormat::Write:" + file_path);
if (!fs_ptr->writer_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
}
......
......@@ -29,6 +29,7 @@
#include "db/Utils.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace codec {
......@@ -45,7 +46,7 @@ Status
DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
milvus::TimeRecorderAuto recorder("DeletedDocsFormat::Read:" + full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
}
......@@ -72,7 +73,7 @@ Status
DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
milvus::TimeRecorderAuto recorder("DeletedDocsFormat::Write:" + full_file_path);
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t num_bytes = sizeof(engine::offset_t) * deleted_docs->GetCount();
......
......@@ -23,6 +23,7 @@
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace codec {
......@@ -40,6 +41,7 @@ IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
milvus::TimeRecorderAuto recorder("IdBloomFilterFormat::Read:" + full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path);
}
......@@ -66,6 +68,7 @@ IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::strin
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
milvus::TimeRecorderAuto recorder("IdBloomFilterFormat::Write:" + full_file_path);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path);
}
......
......@@ -82,7 +82,7 @@ StructuredIndexFormat::CreateStructuredIndex(const engine::DataType data_type) {
Status
StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("StructuredIndexFormat::Read");
milvus::TimeRecorderAuto recorder("StructuredIndexFormat::Read:" + file_path);
knowhere::BinarySet load_data_list;
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
......@@ -132,10 +132,6 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
}
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::read(" << full_file_path << ") rate " << rate << "MB/s";
auto attr_type = static_cast<engine::DataType>(data_type);
index = CreateStructuredIndex(attr_type);
index->Load(load_data_list);
......@@ -146,7 +142,7 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
Status
StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
engine::DataType data_type, const knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("StructuredIndexFormat::Write");
milvus::TimeRecorderAuto recorder("StructuredIndexFormat::Write:" + file_path);
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
......@@ -184,10 +180,6 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
fs_ptr->writer_ptr_->Write(data.data(), data.size());
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(data.data()), data.size());
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write structured index: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
......
......@@ -41,7 +41,7 @@ VectorCompressFormat::FilePostfix() {
Status
VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("VectorCompressFormat::Read");
milvus::TimeRecorderAuto recorder("VectorCompressFormat::Read:" + file_path);
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
......@@ -63,17 +63,13 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
fs_ptr->reader_ptr_->Read(compress->data.get(), length);
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s";
return Status::OK();
}
Status
VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("VectorCompressFormat::Write");
milvus::TimeRecorderAuto recorder("VectorCompressFormat::Write:" + file_path);
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
......@@ -93,10 +89,6 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(compress->data.get()), compress->size);
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write compress data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
......
......@@ -182,7 +182,7 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar
Status
VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index) {
milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex");
milvus::TimeRecorderAuto recorder("SVectorIndexFormat::WriteIndex:" + file_path);
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
......@@ -226,10 +226,6 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(data.data()), data.size());
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
} catch (std::exception& ex) {
std::string err_msg = "Failed to write vector index data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -244,7 +240,7 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
Status
VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index) {
milvus::TimeRecorder recorder("VectorIndexFormat::WriteCompress");
milvus::TimeRecorderAuto recorder("VectorIndexFormat::WriteCompress:" + file_path);
auto binaryset = index->Serialize(knowhere::Config());
......
......@@ -26,6 +26,8 @@
#include "db/Types.h"
#include "db/Utils.h"
#include "db/snapshot/ResourceHelper.h"
#include "knowhere/index/vector_index/IndexBinaryIDMAP.h"
#include "knowhere/index/vector_index/IndexIDMAP.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "knowhere/index/vector_index/VecIndexFactory.h"
#include "knowhere/index/vector_index/adapter/VectorAdapter.h"
......@@ -106,7 +108,7 @@ SegmentReader::Load() {
Status
SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& raw, bool to_cache) {
try {
TimeRecorder recorder("SegmentReader::LoadField: " + field_name);
TimeRecorderAuto recorder("SegmentReader::LoadField: " + field_name);
segment_ptr_->GetFixedFieldData(field_name, raw);
if (raw != nullptr) {
......@@ -136,8 +138,6 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r
}
segment_ptr_->SetFixedFieldData(field_name, raw);
recorder.RecordSection("read " + file_path);
} catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -169,24 +169,84 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vector<int
try {
TimeRecorderAuto recorder("SegmentReader::LoadEntities: " + field_name);
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
if (field_visitor == nullptr) {
return Status(DB_ERROR, "Invalid field_name");
}
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
int64_t field_width = 0;
STATUS_CHECK(segment_ptr_->GetFixedFieldWidth(field_name, field_width));
if (field_width <= 0) {
return Status(DB_ERROR, "Invalid field width");
}
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
if (field_visitor == nullptr) {
return Status(DB_ERROR, "Invalid field name");
}
// copy from cache function
auto copy_data = [&](uint8_t* src_data, int64_t src_data_size, const std::vector<int64_t>& offsets,
int64_t field_width, engine::BinaryDataPtr& raw) -> Status {
if (src_data == nullptr) {
return Status(DB_ERROR, "src_data is null pointer");
}
int64_t total_bytes = offsets.size() * field_width;
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(total_bytes);
// copy from cache
int64_t target_poz = 0;
for (auto offset : offsets) {
int64_t src_poz = offset * field_width;
if (offset < 0 || src_poz + field_width > src_data_size) {
return Status(DB_ERROR, "Invalid entity offset");
}
memcpy(raw->data_.data() + target_poz, src_data + src_poz, field_width);
target_poz += field_width;
}
return Status::OK();
};
// if raw data is alrady in cache, copy from cache
const engine::snapshot::FieldPtr& field = field_visitor->GetField();
engine::BinaryDataPtr field_data;
segment_ptr_->GetFixedFieldData(field_name, field_data);
if (field_data != nullptr) {
return copy_data(field_data->data_.data(), field_data->data_.size(), offsets, field_width, raw);
}
// for vector field, the LoadVectorIndex() could create a IDMAP index in cache, copy from the index
if (engine::IsVectorField(field)) {
std::string temp_index_path;
GetTempIndexPath(field->GetName(), temp_index_path);
uint8_t* src_data = nullptr;
int64_t src_data_size = 0;
if (auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path)) {
auto index_ptr = std::static_pointer_cast<knowhere::VecIndex>(data_obj);
if (index_ptr->index_type() == knowhere::IndexEnum::INDEX_FAISS_IDMAP) {
auto idmap_index = std::static_pointer_cast<knowhere::IDMAP>(index_ptr);
src_data = (uint8_t*)idmap_index->GetRawVectors();
src_data_size = idmap_index->Count() * field_width;
} else if (index_ptr->index_type() == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) {
auto idmap_index = std::static_pointer_cast<knowhere::BinaryIDMAP>(index_ptr);
src_data = (uint8_t*)idmap_index->GetRawVectors();
src_data_size = idmap_index->Count() * field_width;
}
}
if (src_data) {
return copy_data(src_data, src_data_size, offsets, field_width, raw);
}
}
// read from storage
codec::ReadRanges ranges;
for (auto offset : offsets) {
ranges.push_back(codec::ReadRange(offset * field_width, field_width));
}
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, ranges, raw));
} catch (std::exception& e) {
......
......@@ -91,8 +91,6 @@ SegmentWriter::Serialize() {
Status
SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) {
TimeRecorderAuto recorder("SegmentWriter::WriteField: " + file_path);
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw));
......@@ -101,7 +99,7 @@ SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryData
Status
SegmentWriter::WriteFields() {
TimeRecorder recorder("SegmentWriter::WriteFields");
TimeRecorderAuto recorder("SegmentWriter::WriteFields");
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& iter : field_visitors_map) {
......@@ -176,8 +174,6 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
return Status(DB_ERROR, "WriteBloomFilter: null pointer");
}
TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path);
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr));
......@@ -186,8 +182,6 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
Status
SegmentWriter::WriteDeletedDocs() {
TimeRecorder recorder("SegmentWriter::WriteDeletedDocs");
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
......@@ -215,8 +209,6 @@ SegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsP
return Status::OK();
}
TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path);
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs));
......
......@@ -851,6 +851,10 @@ TEST_F(DBTest, GetEntityTest) {
}
std::cout << "Post GetEntityByID2" << std::endl;
// load collection data into cache, let the GetEntityByID() get entity from cache
std::vector<std::string> fields;
db_->LoadCollection(dummy_context_, collection_name, fields);
{
std::vector<std::string> field_names;
fill_field_names(field_mappings, field_names);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册