diff --git a/CHANGELOG.md b/CHANGELOG.md index 73be888349b047d8f02dfc4c586185114caf5b02..f0b41cb6dc2349f6a458848f912515c1d5c41469 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ Please mark all change in change log and use the issue from GitHub - \#1448 General proto api for NNS libraries - \#1480 Add return code for AVX512 selection - \#1524 Update config "preload_table" description +- \#1537 Optimize raw vector and uids read/write - \#1544 Update resources name in HTTP module - \#1567 Update yaml config description diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index 61b72e8795f5f37f1a403b39db6c222c1f37472d..f32f0ae5bafcb4c0af343c24b5cb471a7b4327ad 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -19,15 +19,88 @@ #include #include +#include #include #include "utils/Exception.h" #include "utils/Log.h" +#include "utils/TimeRecorder.h" namespace milvus { namespace codec { +void +DefaultVectorsFormat::read_vectors_internal(const std::string& file_path, off_t offset, size_t num, + std::vector& raw_vectors) { + int rv_fd = open(file_path.c_str(), O_RDONLY, 00664); + if (rv_fd == -1) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); + } + + size_t num_bytes; + if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + num = std::min(num, num_bytes - offset); + + offset += sizeof(size_t); // Beginning of file is num_bytes + int off = lseek(rv_fd, offset, SEEK_SET); + if (off == -1) { + std::string err_msg = "Failed to seek file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + raw_vectors.resize(num / sizeof(uint8_t)); + if (::read(rv_fd, raw_vectors.data(), num) == -1) { + std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + if (::close(rv_fd) == -1) { + std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } +} + +void +DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vector& uids) { + int uid_fd = open(file_path.c_str(), O_RDONLY, 00664); + if (uid_fd == -1) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); + } + + size_t num_bytes; + if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { + std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + uids.resize(num_bytes / sizeof(segment::doc_id_t)); + if (::read(uid_fd, uids.data(), num_bytes) == -1) { + std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + if (::close(uid_fd) == -1) { + std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } +} + void DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) { const std::lock_guard lock(mutex_); @@ -47,68 +120,15 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == raw_vector_extension_) { - int rv_fd = open(path.c_str(), O_RDONLY, 00664); - if (rv_fd == -1) { - std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - size_t num_bytes; - if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - std::vector vector_list; - vector_list.resize(num_bytes); - if (::read(rv_fd, vector_list.data(), num_bytes) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - + read_vectors_internal(path.string(), 0, INT64_MAX, vector_list); vectors_read->AddData(vector_list); vectors_read->SetName(path.stem().string()); - - if (::close(rv_fd) == -1) { - std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } } if (path.extension().string() == user_id_extension_) { - int uid_fd = open(path.c_str(), O_RDONLY, 00664); - if (uid_fd == -1) { - std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - size_t num_bytes; - if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - - auto count = num_bytes / sizeof(segment::doc_id_t); std::vector uids; - uids.resize(count); - if (::read(uid_fd, uids.data(), num_bytes) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - + read_uids_internal(path.string(), uids); vectors_read->AddUids(uids); - - if (::close(uid_fd) == -1) { - std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } } } } @@ -122,28 +142,7 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; - /* - FILE* rv_file = fopen(rv_file_path.c_str(), "wb"); - if (rv_file == nullptr) { - std::string err_msg = "Failed to open file: " + rv_file_path; - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - fwrite((void*)(it.second->GetData()), sizeof(char), it.second->GetNumBytes(), rv_file); - fclose(rv_file); - - - FILE* uid_file = fopen(uid_file_path.c_str(), "wb"); - if (uid_file == nullptr) { - std::string err_msg = "Failed to open file: " + uid_file_path; - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - fwrite((void*)(it.second->GetUids()), sizeof it.second->GetUids()[0], it.second->GetCount(), uid_file); - fclose(rv_file); - */ + TimeRecorder rc("write vectors"); int rv_fd = open(rv_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664); if (rv_fd == -1) { @@ -151,14 +150,6 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } - int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664); - if (uid_fd == -1) { - std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - auto start = std::chrono::high_resolution_clock::now(); size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t); if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) { @@ -177,12 +168,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm throw Exception(SERVER_WRITE_ERROR, err_msg); } - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration diff = end - start; - ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s"; - - start = std::chrono::high_resolution_clock::now(); + rc.RecordSection("write rv done"); + int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664); + if (uid_fd == -1) { + std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); + } size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t); if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) { std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno); @@ -199,9 +192,8 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); } - end = std::chrono::high_resolution_clock::now(); - diff = end - start; - ENGINE_LOG_DEBUG << "Writing uids took " << diff.count() << " s"; + + rc.RecordSection("write uids done"); } void @@ -223,30 +215,7 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == user_id_extension_) { - int uid_fd = open(path.c_str(), O_RDONLY, 00664); - if (uid_fd == -1) { - std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - size_t num_bytes; - if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - auto count = num_bytes / sizeof(segment::doc_id_t); - uids.resize(count); - if (::read(uid_fd, uids.data(), num_bytes) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - if (::close(uid_fd) == -1) { - std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + read_uids_internal(path.string(), uids); } } } @@ -271,34 +240,7 @@ DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == raw_vector_extension_) { - int rv_fd = open(path.c_str(), O_RDONLY, 00664); - if (rv_fd == -1) { - std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); - } - - offset += sizeof(size_t); // Beginning of file is num_bytes - int off = lseek(rv_fd, offset, SEEK_SET); - if (off == -1) { - std::string err_msg = "Failed to seek file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - - raw_vectors.resize(num_bytes); - - if (::read(rv_fd, raw_vectors.data(), num_bytes) == -1) { - std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - - if (::close(rv_fd) == -1) { - std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno); - ENGINE_LOG_ERROR << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + read_vectors_internal(path.string(), offset, num_bytes, raw_vectors); } } } diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index e7993c0e480d618eaba4036b81292d936645c4cb..54c9b5278db8acecadea2b9c7a65f22e83fdd3ab 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -32,17 +32,16 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat() = default; void - read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) override; + read(const store::DirectoryPtr&, segment::VectorsPtr&) override; void - write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) override; + write(const store::DirectoryPtr&, const segment::VectorsPtr&) override; void - read_uids(const store::DirectoryPtr& directory_ptr, std::vector& uids) override; + read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector&) override; void - read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, - std::vector& raw_vectors) override; + read_uids(const store::DirectoryPtr&, std::vector&) override; // No copy and move DefaultVectorsFormat(const DefaultVectorsFormat&) = delete; @@ -53,6 +52,13 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat& operator=(DefaultVectorsFormat&&) = delete; + private: + void + read_vectors_internal(const std::string&, off_t, size_t, std::vector&); + + void + read_uids_internal(const std::string&, std::vector&); + private: std::mutex mutex_;