未验证 提交 ad0f26f9 编写于 作者: J Jin Hai 提交者: GitHub

Merge pull request #1538 from cydrain/caiyd_codec_opt

Caiyd codec opt
......@@ -108,6 +108,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1448 General proto api for NNS libraries
- \#1480 Add return code for AVX512 selection
- \#1524 Update config "preload_table" description
- \#1537 Optimize raw vector and uids read/write
- \#1544 Update resources name in HTTP module
- \#1567 Update yaml config description
......
......@@ -19,15 +19,88 @@
#include <fcntl.h>
#include <unistd.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace codec {
void
DefaultVectorsFormat::read_vectors_internal(const std::string& file_path, off_t offset, size_t num,
std::vector<uint8_t>& raw_vectors) {
int rv_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (rv_fd == -1) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes;
if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
num = std::min(num, num_bytes - offset);
offset += sizeof(size_t); // Beginning of file is num_bytes
int off = lseek(rv_fd, offset, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
raw_vectors.resize(num / sizeof(uint8_t));
if (::read(rv_fd, raw_vectors.data(), num) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
void
DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vector<segment::doc_id_t>& uids) {
int uid_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
uids.resize(num_bytes / sizeof(segment::doc_id_t));
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
void
DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) {
const std::lock_guard<std::mutex> lock(mutex_);
......@@ -47,68 +120,15 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
int rv_fd = open(path.c_str(), O_RDONLY, 00664);
if (rv_fd == -1) {
std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes;
if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
std::vector<uint8_t> vector_list;
vector_list.resize(num_bytes);
if (::read(rv_fd, vector_list.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
read_vectors_internal(path.string(), 0, INT64_MAX, vector_list);
vectors_read->AddData(vector_list);
vectors_read->SetName(path.stem().string());
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
if (path.extension().string() == user_id_extension_) {
int uid_fd = open(path.c_str(), O_RDONLY, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto count = num_bytes / sizeof(segment::doc_id_t);
std::vector<segment::doc_id_t> uids;
uids.resize(count);
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
read_uids_internal(path.string(), uids);
vectors_read->AddUids(uids);
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
}
}
......@@ -122,28 +142,7 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_;
const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_;
/*
FILE* rv_file = fopen(rv_file_path.c_str(), "wb");
if (rv_file == nullptr) {
std::string err_msg = "Failed to open file: " + rv_file_path;
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
fwrite((void*)(it.second->GetData()), sizeof(char), it.second->GetNumBytes(), rv_file);
fclose(rv_file);
FILE* uid_file = fopen(uid_file_path.c_str(), "wb");
if (uid_file == nullptr) {
std::string err_msg = "Failed to open file: " + uid_file_path;
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
fwrite((void*)(it.second->GetUids()), sizeof it.second->GetUids()[0], it.second->GetCount(), uid_file);
fclose(rv_file);
*/
TimeRecorder rc("write vectors");
int rv_fd = open(rv_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (rv_fd == -1) {
......@@ -151,14 +150,6 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto start = std::chrono::high_resolution_clock::now();
size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t);
if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) {
......@@ -177,12 +168,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s";
start = std::chrono::high_resolution_clock::now();
rc.RecordSection("write rv done");
int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t);
if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
......@@ -199,9 +192,8 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing uids took " << diff.count() << " s";
rc.RecordSection("write uids done");
}
void
......@@ -223,30 +215,7 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == user_id_extension_) {
int uid_fd = open(path.c_str(), O_RDONLY, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto count = num_bytes / sizeof(segment::doc_id_t);
uids.resize(count);
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
read_uids_internal(path.string(), uids);
}
}
}
......@@ -271,34 +240,7 @@ DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
int rv_fd = open(path.c_str(), O_RDONLY, 00664);
if (rv_fd == -1) {
std::string err_msg = "Failed to open file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
offset += sizeof(size_t); // Beginning of file is num_bytes
int off = lseek(rv_fd, offset, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
raw_vectors.resize(num_bytes);
if (::read(rv_fd, raw_vectors.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
read_vectors_internal(path.string(), offset, num_bytes, raw_vectors);
}
}
}
......
......@@ -32,17 +32,16 @@ class DefaultVectorsFormat : public VectorsFormat {
DefaultVectorsFormat() = default;
void
read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) override;
read(const store::DirectoryPtr&, segment::VectorsPtr&) override;
void
write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) override;
write(const store::DirectoryPtr&, const segment::VectorsPtr&) override;
void
read_uids(const store::DirectoryPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) override;
read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector<uint8_t>&) override;
void
read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) override;
read_uids(const store::DirectoryPtr&, std::vector<segment::doc_id_t>&) override;
// No copy and move
DefaultVectorsFormat(const DefaultVectorsFormat&) = delete;
......@@ -53,6 +52,13 @@ class DefaultVectorsFormat : public VectorsFormat {
DefaultVectorsFormat&
operator=(DefaultVectorsFormat&&) = delete;
private:
void
read_vectors_internal(const std::string&, off_t, size_t, std::vector<uint8_t>&);
void
read_uids_internal(const std::string&, std::vector<segment::doc_id_t>&);
private:
std::mutex mutex_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册