diff --git a/core/src/codecs/BlockFormat.cpp b/core/src/codecs/BlockFormat.cpp index fa8949683897d411b821cae4d7d3aefbdd5fec11..ca2a492f663de32648da68eaf5da87598894ef8c 100644 --- a/core/src/codecs/BlockFormat.cpp +++ b/core/src/codecs/BlockFormat.cpp @@ -27,12 +27,14 @@ #include "db/Utils.h" #include "utils/Exception.h" #include "utils/Log.h" +#include "utils/TimeRecorder.h" namespace milvus { namespace codec { Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) { + milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path); if (!fs_ptr->reader_ptr_->Open(file_path)) { return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } @@ -54,6 +56,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes, engine::BinaryDataPtr& raw) { + milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path); if (offset < 0 || num_bytes <= 0) { return Status(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path); } @@ -62,7 +65,9 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } CHECK_MAGIC_VALID(fs_ptr); - CHECK_SUM_VALID(fs_ptr); + + // no need to check sum, check sum read whole file data, poor performance + // CHECK_SUM_VALID(fs_ptr); HeaderMap map = ReadHeaderValues(fs_ptr); size_t total_num_bytes = stol(map.at("size")); @@ -84,6 +89,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p Status BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges, engine::BinaryDataPtr& raw) { + milvus::TimeRecorderAuto recorder("BlockFormat::Read:" + file_path); if (read_ranges.empty()) { return Status::OK(); } @@ -92,7 +98,9 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } CHECK_MAGIC_VALID(fs_ptr); - CHECK_SUM_VALID(fs_ptr); + + // no need to check sum, check sum read whole file data, poor performance + // CHECK_SUM_VALID(fs_ptr); HeaderMap map = ReadHeaderValues(fs_ptr); size_t total_num_bytes = stol(map.at("size")); @@ -127,6 +135,8 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_ return Status::OK(); } + milvus::TimeRecorderAuto recorder("BlockFormat::Write:" + file_path); + if (!fs_ptr->writer_ptr_->Open(file_path)) { return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path); } diff --git a/core/src/codecs/DeletedDocsFormat.cpp b/core/src/codecs/DeletedDocsFormat.cpp index eb8ecdcd355853273e61951d75748fda04af7c68..0db4db91b5d4cb4eea4dad9bb84e6a96ba8b621a 100644 --- a/core/src/codecs/DeletedDocsFormat.cpp +++ b/core/src/codecs/DeletedDocsFormat.cpp @@ -29,6 +29,7 @@ #include "db/Utils.h" #include "utils/Exception.h" #include "utils/Log.h" +#include "utils/TimeRecorder.h" namespace milvus { namespace codec { @@ -45,7 +46,7 @@ Status DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, segment::DeletedDocsPtr& deleted_docs) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; - + milvus::TimeRecorderAuto recorder("DeletedDocsFormat::Read:" + full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path); } @@ -72,7 +73,7 @@ Status DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const segment::DeletedDocsPtr& deleted_docs) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; - + milvus::TimeRecorderAuto recorder("DeletedDocsFormat::Write:" + full_file_path); auto deleted_docs_list = deleted_docs->GetDeletedDocs(); size_t num_bytes = sizeof(engine::offset_t) * deleted_docs->GetCount(); diff --git a/core/src/codecs/IdBloomFilterFormat.cpp b/core/src/codecs/IdBloomFilterFormat.cpp index 0feb9357dd392be92f938f1582c438688f6e4e37..062af6f4fb79ea2965bc40a7748578dee585f5a5 100644 --- a/core/src/codecs/IdBloomFilterFormat.cpp +++ b/core/src/codecs/IdBloomFilterFormat.cpp @@ -23,6 +23,7 @@ #include "utils/Exception.h" #include "utils/Log.h" +#include "utils/TimeRecorder.h" namespace milvus { namespace codec { @@ -40,6 +41,7 @@ IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string segment::IdBloomFilterPtr& id_bloom_filter_ptr) { try { const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; + milvus::TimeRecorderAuto recorder("IdBloomFilterFormat::Read:" + full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path); } @@ -66,6 +68,7 @@ IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::strin const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { try { const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX; + milvus::TimeRecorderAuto recorder("IdBloomFilterFormat::Write:" + full_file_path); if (!fs_ptr->writer_ptr_->Open(full_file_path)) { return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open bloom filter file: " + full_file_path); } diff --git a/core/src/codecs/StructuredIndexFormat.cpp b/core/src/codecs/StructuredIndexFormat.cpp index 658094a700f419aaa4bff861ae4dbfd4d1d576a7..a3e055fb6c4478b06b7a04cf7baf7b1bd330156b 100644 --- a/core/src/codecs/StructuredIndexFormat.cpp +++ b/core/src/codecs/StructuredIndexFormat.cpp @@ -82,7 +82,7 @@ StructuredIndexFormat::CreateStructuredIndex(const engine::DataType data_type) { Status StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::IndexPtr& index) { - milvus::TimeRecorder recorder("StructuredIndexFormat::Read"); + milvus::TimeRecorderAuto recorder("StructuredIndexFormat::Read:" + file_path); knowhere::BinarySet load_data_list; std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; @@ -132,10 +132,6 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s } fs_ptr->reader_ptr_->Close(); - double span = recorder.RecordSection("End"); - double rate = length * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::read(" << full_file_path << ") rate " << rate << "MB/s"; - auto attr_type = static_cast(data_type); index = CreateStructuredIndex(attr_type); index->Load(load_data_list); @@ -146,7 +142,7 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s Status StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::DataType data_type, const knowhere::IndexPtr& index) { - milvus::TimeRecorder recorder("StructuredIndexFormat::Write"); + milvus::TimeRecorderAuto recorder("StructuredIndexFormat::Write:" + file_path); std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; @@ -184,10 +180,6 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const fs_ptr->writer_ptr_->Write(data.data(), data.size()); WRITE_SUM(fs_ptr, header, reinterpret_cast(data.data()), data.size()); fs_ptr->writer_ptr_->Close(); - - double span = recorder.RecordSection("End"); - double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s"; } catch (std::exception& ex) { std::string err_msg = "Failed to write structured index: " + std::string(ex.what()); LOG_ENGINE_ERROR_ << err_msg; diff --git a/core/src/codecs/VectorCompressFormat.cpp b/core/src/codecs/VectorCompressFormat.cpp index 5048cd64810a6702136c606d1a71a7bfde7d31ad..d7e40784112ffcd87533f8309f3170bf39da1822 100644 --- a/core/src/codecs/VectorCompressFormat.cpp +++ b/core/src/codecs/VectorCompressFormat.cpp @@ -41,7 +41,7 @@ VectorCompressFormat::FilePostfix() { Status VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& compress) { - milvus::TimeRecorder recorder("VectorCompressFormat::Read"); + milvus::TimeRecorderAuto recorder("VectorCompressFormat::Read:" + file_path); const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; if (!fs_ptr->reader_ptr_->Open(full_file_path)) { @@ -63,17 +63,13 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin fs_ptr->reader_ptr_->Read(compress->data.get(), length); fs_ptr->reader_ptr_->Close(); - double span = recorder.RecordSection("End"); - double rate = length * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "VectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s"; - return Status::OK(); } Status VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::BinaryPtr& compress) { - milvus::TimeRecorder recorder("VectorCompressFormat::Write"); + milvus::TimeRecorderAuto recorder("VectorCompressFormat::Write:" + file_path); const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; @@ -93,10 +89,6 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri WRITE_SUM(fs_ptr, header, reinterpret_cast(compress->data.get()), compress->size); fs_ptr->writer_ptr_->Close(); - - double span = recorder.RecordSection("End"); - double rate = compress->size * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s"; } catch (std::exception& ex) { std::string err_msg = "Failed to write compress data: " + std::string(ex.what()); LOG_ENGINE_ERROR_ << err_msg; diff --git a/core/src/codecs/VectorIndexFormat.cpp b/core/src/codecs/VectorIndexFormat.cpp index 8b329afc033172a7a84f7dbef97b74d7e88af667..206ef9d7009749235bf713d3957e38dbdd1afb52 100644 --- a/core/src/codecs/VectorIndexFormat.cpp +++ b/core/src/codecs/VectorIndexFormat.cpp @@ -182,7 +182,7 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar Status VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index) { - milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex"); + milvus::TimeRecorderAuto recorder("SVectorIndexFormat::WriteIndex:" + file_path); std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX; @@ -226,10 +226,6 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st WRITE_SUM(fs_ptr, header, reinterpret_cast(data.data()), data.size()); fs_ptr->writer_ptr_->Close(); - - double span = recorder.RecordSection("End"); - double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; - LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s"; } catch (std::exception& ex) { std::string err_msg = "Failed to write vector index data: " + std::string(ex.what()); LOG_ENGINE_ERROR_ << err_msg; @@ -244,7 +240,7 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st Status VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const knowhere::VecIndexPtr& index) { - milvus::TimeRecorder recorder("VectorIndexFormat::WriteCompress"); + milvus::TimeRecorderAuto recorder("VectorIndexFormat::WriteCompress:" + file_path); auto binaryset = index->Serialize(knowhere::Config()); diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index af706e6000f7685da5f93b01dc1b5133829b6935..457e87f8ac784a5d340d917af9ee79c7ca6351b0 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -26,6 +26,8 @@ #include "db/Types.h" #include "db/Utils.h" #include "db/snapshot/ResourceHelper.h" +#include "knowhere/index/vector_index/IndexBinaryIDMAP.h" +#include "knowhere/index/vector_index/IndexIDMAP.h" #include "knowhere/index/vector_index/VecIndex.h" #include "knowhere/index/vector_index/VecIndexFactory.h" #include "knowhere/index/vector_index/adapter/VectorAdapter.h" @@ -106,7 +108,7 @@ SegmentReader::Load() { Status SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& raw, bool to_cache) { try { - TimeRecorder recorder("SegmentReader::LoadField: " + field_name); + TimeRecorderAuto recorder("SegmentReader::LoadField: " + field_name); segment_ptr_->GetFixedFieldData(field_name, raw); if (raw != nullptr) { @@ -136,8 +138,6 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r } segment_ptr_->SetFixedFieldData(field_name, raw); - - recorder.RecordSection("read " + file_path); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; @@ -169,24 +169,84 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vectorGetFieldVisitor(field_name); - if (field_visitor == nullptr) { - return Status(DB_ERROR, "Invalid field_name"); - } - auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW); - std::string file_path = - engine::snapshot::GetResPath(dir_collections_, raw_visitor->GetFile()); - int64_t field_width = 0; STATUS_CHECK(segment_ptr_->GetFixedFieldWidth(field_name, field_width)); if (field_width <= 0) { return Status(DB_ERROR, "Invalid field width"); } + auto field_visitor = segment_visitor_->GetFieldVisitor(field_name); + if (field_visitor == nullptr) { + return Status(DB_ERROR, "Invalid field name"); + } + + // copy from cache function + auto copy_data = [&](uint8_t* src_data, int64_t src_data_size, const std::vector& offsets, + int64_t field_width, engine::BinaryDataPtr& raw) -> Status { + if (src_data == nullptr) { + return Status(DB_ERROR, "src_data is null pointer"); + } + int64_t total_bytes = offsets.size() * field_width; + raw = std::make_shared(); + raw->data_.resize(total_bytes); + + // copy from cache + int64_t target_poz = 0; + for (auto offset : offsets) { + int64_t src_poz = offset * field_width; + if (offset < 0 || src_poz + field_width > src_data_size) { + return Status(DB_ERROR, "Invalid entity offset"); + } + + memcpy(raw->data_.data() + target_poz, src_data + src_poz, field_width); + target_poz += field_width; + } + + return Status::OK(); + }; + + // if raw data is alrady in cache, copy from cache + const engine::snapshot::FieldPtr& field = field_visitor->GetField(); + engine::BinaryDataPtr field_data; + segment_ptr_->GetFixedFieldData(field_name, field_data); + if (field_data != nullptr) { + return copy_data(field_data->data_.data(), field_data->data_.size(), offsets, field_width, raw); + } + + // for vector field, the LoadVectorIndex() could create a IDMAP index in cache, copy from the index + if (engine::IsVectorField(field)) { + std::string temp_index_path; + GetTempIndexPath(field->GetName(), temp_index_path); + + uint8_t* src_data = nullptr; + int64_t src_data_size = 0; + if (auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path)) { + auto index_ptr = std::static_pointer_cast(data_obj); + if (index_ptr->index_type() == knowhere::IndexEnum::INDEX_FAISS_IDMAP) { + auto idmap_index = std::static_pointer_cast(index_ptr); + src_data = (uint8_t*)idmap_index->GetRawVectors(); + src_data_size = idmap_index->Count() * field_width; + } else if (index_ptr->index_type() == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) { + auto idmap_index = std::static_pointer_cast(index_ptr); + src_data = (uint8_t*)idmap_index->GetRawVectors(); + src_data_size = idmap_index->Count() * field_width; + } + } + + if (src_data) { + return copy_data(src_data, src_data_size, offsets, field_width, raw); + } + } + + // read from storage codec::ReadRanges ranges; for (auto offset : offsets) { ranges.push_back(codec::ReadRange(offset * field_width, field_width)); } + + auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW); + std::string file_path = + engine::snapshot::GetResPath(dir_collections_, raw_visitor->GetFile()); auto& ss_codec = codec::Codec::instance(); STATUS_CHECK(ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, ranges, raw)); } catch (std::exception& e) { diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 3436ac15b7ef8df4c4c207ee39a9fcc12d0c6798..9bb51500ea5f49828ed2b8e2c0952355fa5be23a 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -91,8 +91,6 @@ SegmentWriter::Serialize() { Status SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) { - TimeRecorderAuto recorder("SegmentWriter::WriteField: " + file_path); - auto& ss_codec = codec::Codec::instance(); STATUS_CHECK(ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw)); @@ -101,7 +99,7 @@ SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryData Status SegmentWriter::WriteFields() { - TimeRecorder recorder("SegmentWriter::WriteFields"); + TimeRecorderAuto recorder("SegmentWriter::WriteFields"); auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); for (auto& iter : field_visitors_map) { @@ -176,8 +174,6 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte return Status(DB_ERROR, "WriteBloomFilter: null pointer"); } - TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path); - auto& ss_codec = codec::Codec::instance(); STATUS_CHECK(ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr)); @@ -186,8 +182,6 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte Status SegmentWriter::WriteDeletedDocs() { - TimeRecorder recorder("SegmentWriter::WriteDeletedDocs"); - auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID); auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); @@ -215,8 +209,6 @@ SegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsP return Status::OK(); } - TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path); - auto& ss_codec = codec::Codec::instance(); STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs)); diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 1f855e6ac39595de31f2f7668e22f0982aea11bb..1d7c0120113dcae6fc2459c8946679d0395232da 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -851,6 +851,10 @@ TEST_F(DBTest, GetEntityTest) { } std::cout << "Post GetEntityByID2" << std::endl; + // load collection data into cache, let the GetEntityByID() get entity from cache + std::vector fields; + db_->LoadCollection(dummy_context_, collection_name, fields); + { std::vector field_names; fill_field_names(field_mappings, field_names);