未验证 提交 fc8a6e07 编写于 作者: Z Zhiru Zhu 提交者: GitHub

Fix #1491 and #1504 (#1506)

* update
Signed-off-by: NZhiru Zhu <zzhu@fandm.edu>

* update
Signed-off-by: NZhiru Zhu <zzhu@fandm.edu>

* update
Signed-off-by: NZhiru Zhu <zzhu@fandm.edu>

* fix centos compile error
Signed-off-by: NZhiru Zhu <zzhu@fandm.edu>
上级 600dac75
......@@ -29,6 +29,8 @@ Please mark all change in change log and use the issue from GitHub
- \#1359 Negative distance value returned when searching with HNSW index type
- \#1429 Server crashed when searching vectors using GPU
- \#1484 Index type changed to IDMAP after compacted
- \#1491 Server crashed during adding vectors
- \#1504 Avoid possible race condition between delete and search
## Feature
- \#216 Add CLI to get server info
......
......@@ -20,7 +20,9 @@
#include <fcntl.h>
#include <unistd.h>
#define BOOST_NO_CXX11_SCOPED_ENUMS
#include <boost/filesystem.hpp>
#undef BOOST_NO_CXX11_SCOPED_ENUMS
#include <memory>
#include <string>
#include <vector>
......@@ -46,12 +48,18 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto file_size = boost::filesystem::file_size(boost::filesystem::path(del_file_path));
auto deleted_docs_size = file_size / sizeof(segment::offset_t);
size_t num_bytes;
if (::read(del_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto deleted_docs_size = num_bytes / sizeof(segment::offset_t);
std::vector<segment::offset_t> deleted_docs_list;
deleted_docs_list.resize(deleted_docs_size);
if (::read(del_fd, deleted_docs_list.data(), file_size) == -1) {
if (::read(del_fd, deleted_docs_list.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
......@@ -73,27 +81,69 @@ DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const
std::string dir_path = directory_ptr->GetDirPath();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
// TODO(zhiru): append mode
int del_fd = open(del_file_path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664);
// Create a temporary file from the existing file
const std::string temp_path = dir_path + "/" + "temp_del";
bool exists = boost::filesystem::exists(del_file_path);
if (exists) {
boost::filesystem::copy_file(del_file_path, temp_path, boost::filesystem::copy_option::fail_if_exists);
}
// Write to the temp file, in order to avoid possible race condition with search (concurrent read and write)
int del_fd = open(temp_path.c_str(), O_RDWR | O_CREAT, 00664);
if (del_fd == -1) {
std::string err_msg = "Failed to open file: " + del_file_path;
std::string err_msg = "Failed to open file: " + temp_path;
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t old_num_bytes;
if (exists) {
if (::read(del_fd, &old_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
} else {
old_num_bytes = 0;
}
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize();
// rewind and overwrite with the new_num_bytes
int off = lseek(del_fd, 0, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(del_fd, &new_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) {
std::string err_msg = "Failed to write to file" + del_file_path + ", error: " + std::strerror(errno);
// Move to the end of file and append
off = lseek(del_fd, 0, SEEK_END);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(del_fd, deleted_docs_list.data(), new_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(del_fd) == -1) {
std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno);
std::string err_msg = "Failed to close file: " + temp_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
// Move temp file to delete file
boost::filesystem::rename(temp_path, del_file_path);
}
} // namespace codec
......
......@@ -53,7 +53,14 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes = boost::filesystem::file_size(path);
size_t num_bytes;
if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
std::vector<uint8_t> vector_list;
vector_list.resize(num_bytes);
if (::read(rv_fd, vector_list.data(), num_bytes) == -1) {
......@@ -78,11 +85,18 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto file_size = boost::filesystem::file_size(path);
auto count = file_size / sizeof(segment::doc_id_t);
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto count = num_bytes / sizeof(segment::doc_id_t);
std::vector<segment::doc_id_t> uids;
uids.resize(count);
if (::read(uid_fd, uids.data(), file_size) == -1) {
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
......@@ -146,8 +160,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
auto start = std::chrono::high_resolution_clock::now();
if (::write(rv_fd, vectors->GetData().data(), vectors->GetData().size()) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t);
if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(rv_fd, vectors->GetData().data(), rv_num_bytes) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
......@@ -162,7 +182,14 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s";
start = std::chrono::high_resolution_clock::now();
if (::write(uid_fd, vectors->GetUids().data(), sizeof(segment::doc_id_t) * vectors->GetCount()) == -1) {
size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t);
if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(uid_fd, vectors->GetUids().data(), uid_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
......@@ -202,10 +229,15 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto file_size = boost::filesystem::file_size(path);
auto count = file_size / sizeof(segment::doc_id_t);
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto count = num_bytes / sizeof(segment::doc_id_t);
uids.resize(count);
if (::read(uid_fd, uids.data(), file_size) == -1) {
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + path.string() + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
......@@ -245,6 +277,8 @@ DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
offset += sizeof(size_t); // Beginning of file is num_bytes
int off = lseek(rv_fd, offset, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + path.string() + ", error: " + std::strerror(errno);
......
......@@ -22,7 +22,6 @@
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
#include "store/Directory.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
......@@ -47,8 +46,8 @@ SegmentReader::Load() {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_);
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_);
} catch (Exception& e) {
return Status(e.code(), e.what());
} catch (std::exception& e) {
return Status(SERVER_WRITE_ERROR, e.what());
}
return Status::OK();
}
......@@ -59,10 +58,10 @@ SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>&
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors);
} catch (Exception& e) {
} catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors. " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -73,10 +72,10 @@ SegmentReader::LoadUids(std::vector<doc_id_t>& uids) {
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids);
} catch (Exception& e) {
} catch (std::exception& e) {
std::string err_msg = "Failed to load uids. " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -93,10 +92,10 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
directory_ptr_->Create();
default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr);
} catch (Exception& e) {
} catch (std::exception& e) {
std::string err_msg = "Failed to load bloom filter. " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -107,10 +106,10 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
try {
directory_ptr_->Create();
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr);
} catch (Exception& e) {
} catch (std::exception& e) {
std::string err_msg = "Failed to load deleted docs. " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......
......@@ -24,7 +24,6 @@
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
#include "store/Directory.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
......@@ -87,10 +86,10 @@ SegmentWriter::WriteVectors() {
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_);
} catch (Exception& e) {
std::string err_msg = "Failed to write vectors. " + std::string(e.what());
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -127,10 +126,10 @@ SegmentWriter::WriteBloomFilter() {
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s";
} catch (Exception& e) {
std::string err_msg = "Failed to write vectors. " + std::string(e.what());
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -142,10 +141,10 @@ SegmentWriter::WriteDeletedDocs() {
directory_ptr_->Create();
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr);
} catch (Exception& e) {
std::string err_msg = "Failed to write deleted docs. " + std::string(e.what());
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -156,10 +155,10 @@ SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
try {
directory_ptr_->Create();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs);
} catch (Exception& e) {
std::string err_msg = "Failed to write deleted docs. " + std::string(e.what());
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -170,10 +169,10 @@ SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
directory_ptr_->Create();
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr);
} catch (Exception& e) {
std::string err_msg = "Failed to write bloom filter. " + std::string(e.what());
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
return Status(e.code(), err_msg);
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......
......@@ -199,6 +199,68 @@ TEST_F(DeleteTest, delete_on_disk) {
}
}
TEST_F(DeleteTest, delete_multiple_times) {
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
auto stat = db_->CreateTable(table_info);
milvus::engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = GetTableName();
stat = db_->DescribeTable(table_info_get);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
int64_t nb = 100000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
for (int64_t i = 0; i < nb; i++) {
xb.id_array_.push_back(i);
}
stat = db_->InsertVectors(GetTableName(), "", xb);
ASSERT_TRUE(stat.ok());
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int64_t> dis(0, nb - 1);
int64_t num_query = 10;
std::map<int64_t, milvus::engine::VectorsData> search_vectors;
for (int64_t i = 0; i < num_query; ++i) {
int64_t index = dis(gen);
milvus::engine::VectorsData search;
search.vector_count_ = 1;
for (int64_t j = 0; j < TABLE_DIM; j++) {
search.float_data_.push_back(xb.float_data_[index * TABLE_DIM + j]);
}
search_vectors.insert(std::make_pair(xb.id_array_[index], search));
}
// std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
int topk = 10, nprobe = 10;
for (auto& pair : search_vectors) {
std::vector<int64_t> to_delete{pair.first};
stat = db_->DeleteVectors(GetTableName(), to_delete);
ASSERT_TRUE(stat.ok());
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
auto& search = pair.second;
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(dummy_context_, GetTableName(), tags, topk, nprobe, search, result_ids, result_distances);
ASSERT_NE(result_ids[0], pair.first);
// ASSERT_LT(result_distances[0], 1e-4);
ASSERT_GT(result_distances[0], 1);
}
}
TEST_F(DeleteTest, delete_with_index) {
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
table_info.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT;
......@@ -456,7 +518,7 @@ TEST_F(DeleteTest, delete_add_auto_flush) {
ASSERT_EQ(result_distances[0], std::numeric_limits<float>::max());
}
TEST_F(DeleteTest, compact_basic) {
TEST_F(CompactTest, compact_basic) {
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
auto stat = db_->CreateTable(table_info);
......@@ -507,7 +569,7 @@ TEST_F(DeleteTest, compact_basic) {
}
}
TEST_F(DeleteTest, compact_with_index) {
TEST_F(CompactTest, compact_with_index) {
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
table_info.index_file_size_ = milvus::engine::ONE_KB;
table_info.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFSQ8;
......@@ -591,7 +653,7 @@ TEST_F(DeleteTest, compact_with_index) {
}
}
TEST_F(DeleteTest, compact_non_existing_table) {
TEST_F(CompactTest, compact_non_existing_table) {
auto status = db_->Compact("non_existing_table");
ASSERT_FALSE(status.ok());
}
......@@ -148,6 +148,9 @@ class MemManagerTest2 : public DBTest {};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTest : public DBTest {};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CompactTest : public DBTest {};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchByIdTest : public DBTest {};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册