未验证 提交 52124d70 编写于 作者: Z Zhiru Zhu 提交者: GitHub

Improve delete (#1474)

* remove build openblas/lapack and use find_package

* update ubuntu_build_deps.sh

* update build image

* update build image

* update CHANGELOG

* trigger ci

* update image

* update centos build envvironment image on Jenkins CI

* trigger ci

* add serialize benchmark
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>

* update deleted doc codec
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>

* update delete in mem
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>

* update CHANGELOG
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>

* don't apply delete when force flush
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>

* lint
Signed-off-by: NZhiru Zhu <zhiru.zhu@zilliz.com>
上级 758f15f2
......@@ -41,6 +41,7 @@ Please mark all change in change log and use the issue from GitHub
- \#815 - Support MinIO storage
- \#823 - Support binary vector tanimoto/jaccard/hamming metric
- \#853 - Support HNSW
- \#861 - Support DeleteById / SearchByID / GetVectorById / Flush
- \#910 - Change Milvus c++ standard to c++17
- \#1122 - Support AVX-512 in FAISS
- \#1204 - Add api to get table data information
......@@ -67,7 +68,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1234 - Do S3 server validation check when Milvus startup
- \#1263 - Allow system conf modifiable and some take effect directly
- \#1320 - Remove debug logging from faiss
- \#1444 - Improve delete
## Task
- \#1327 - Exclude third-party code from codebeat
......
......@@ -17,6 +17,9 @@
#include "codecs/default/DefaultDeletedDocsFormat.h"
#include <fcntl.h>
#include <unistd.h>
#include <boost/filesystem.hpp>
#include <memory>
#include <string>
......@@ -35,8 +38,9 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment
std::string dir_path = directory_ptr->GetDirPath();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
FILE* del_file = fopen(del_file_path.c_str(), "rb");
if (del_file == nullptr) {
int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
if (del_fd == -1) {
std::string err_msg = "Failed to open file: " + del_file_path;
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
......@@ -46,9 +50,20 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment
auto deleted_docs_size = file_size / sizeof(segment::offset_t);
std::vector<segment::offset_t> deleted_docs_list;
deleted_docs_list.resize(deleted_docs_size);
fread((void*)(deleted_docs_list.data()), sizeof(segment::offset_t), deleted_docs_size, del_file);
if (::read(del_fd, deleted_docs_list.data(), file_size) == -1) {
std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
deleted_docs = std::make_shared<segment::DeletedDocs>(deleted_docs_list);
fclose(del_file);
if (::close(del_fd) == -1) {
std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
void
......@@ -57,16 +72,28 @@ DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const
std::string dir_path = directory_ptr->GetDirPath();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
FILE* del_file = fopen(del_file_path.c_str(), "ab"); // TODO(zhiru): append mode
if (del_file == nullptr) {
// TODO(zhiru): append mode
int del_fd = open(del_file_path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664);
if (del_fd == -1) {
std::string err_msg = "Failed to open file: " + del_file_path;
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
fwrite((void*)(deleted_docs_list.data()), sizeof(segment::offset_t), deleted_docs->GetSize(), del_file);
fclose(del_file);
if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) {
std::string err_msg = "Failed to write to file" + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(del_fd) == -1) {
std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
}
} // namespace codec
......
......@@ -144,6 +144,8 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
auto start = std::chrono::high_resolution_clock::now();
if (::write(rv_fd, vectors->GetData().data(), vectors->GetData().size()) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
......@@ -155,6 +157,11 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s";
start = std::chrono::high_resolution_clock::now();
if (::write(uid_fd, vectors->GetUids().data(), sizeof(segment::doc_id_t) * vectors->GetCount()) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
......@@ -165,6 +172,9 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing uids took " << diff.count() << " s";
}
void
......
......@@ -38,10 +38,10 @@ class MemManager {
DeleteVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0;
virtual Status
Flush(const std::string& table_id) = 0;
Flush(const std::string& table_id, bool apply_delete = true) = 0;
virtual Status
Flush(std::set<std::string>& table_ids) = 0;
Flush(std::set<std::string>& table_ids, bool apply_delete = true) = 0;
// virtual Status
// Serialize(std::set<std::string>& table_ids) = 0;
......
......@@ -37,7 +37,8 @@ MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush";
auto status = Flush(flushed_tables);
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
return status;
}
......@@ -62,7 +63,8 @@ MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush";
auto status = Flush(flushed_tables);
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
return status;
}
......@@ -126,7 +128,7 @@ MemManagerImpl::DeleteVectors(const std::string& table_id, int64_t length, const
}
Status
MemManagerImpl::Flush(const std::string& table_id) {
MemManagerImpl::Flush(const std::string& table_id, bool apply_delete) {
ToImmutable(table_id);
// TODO: There is actually only one memTable in the immutable list
MemList temp_immutable_list;
......@@ -139,7 +141,7 @@ MemManagerImpl::Flush(const std::string& table_id) {
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn);
auto status = mem->Serialize(max_lsn, apply_delete);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed";
return status;
......@@ -151,7 +153,7 @@ MemManagerImpl::Flush(const std::string& table_id) {
}
Status
MemManagerImpl::Flush(std::set<std::string>& table_ids) {
MemManagerImpl::Flush(std::set<std::string>& table_ids, bool apply_delete) {
ToImmutable();
MemList temp_immutable_list;
......@@ -165,7 +167,7 @@ MemManagerImpl::Flush(std::set<std::string>& table_ids) {
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn);
auto status = mem->Serialize(max_lsn, apply_delete);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed";
return status;
......
......@@ -72,10 +72,10 @@ class MemManagerImpl : public MemManager {
DeleteVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override;
Status
Flush(const std::string& table_id) override;
Flush(const std::string& table_id, bool apply_delete = true) override;
Status
Flush(std::set<std::string>& table_ids) override;
Flush(std::set<std::string>& table_ids, bool apply_delete = true) override;
// Status
// Serialize(std::set<std::string>& table_ids) override;
......
......@@ -97,10 +97,10 @@ MemTable::GetTableFileCount() {
}
Status
MemTable::Serialize(uint64_t wal_lsn) {
MemTable::Serialize(uint64_t wal_lsn, bool apply_delete) {
auto start = std::chrono::high_resolution_clock::now();
if (!doc_ids_to_delete_.empty()) {
if (!doc_ids_to_delete_.empty() && apply_delete) {
auto status = ApplyDeletes();
if (!status.ok()) {
return Status(DB_ERROR, status.message());
......
......@@ -47,7 +47,7 @@ class MemTable {
GetTableFileCount();
Status
Serialize(uint64_t wal_lsn);
Serialize(uint64_t wal_lsn, bool apply_delete = true);
bool
Empty();
......
......@@ -101,8 +101,26 @@ Status
MemTableFile::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
segment::SegmentPtr segment_ptr;
segment_writer_ptr_->GetSegment(segment_ptr);
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
std::vector<segment::doc_id_t> temp;
temp.resize(doc_ids.size());
memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t));
std::sort(temp.begin(), temp.end());
auto uids = segment_ptr->vectors_ptr_->GetUids();
size_t deleted = 0;
size_t loop = uids.size();
for (size_t i = 0; i < loop; ++i) {
if (std::binary_search(temp.begin(), temp.end(), uids[i])) {
segment_ptr->vectors_ptr_->Erase(i - deleted);
++deleted;
}
}
/*
for (auto& doc_id : doc_ids) {
auto found = std::find(uids.begin(), uids.end(), doc_id);
if (found != uids.end()) {
......@@ -111,6 +129,7 @@ MemTableFile::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
uids = segment_ptr->vectors_ptr_->GetUids();
}
}
*/
return Status::OK();
}
......
......@@ -47,18 +47,37 @@ SegmentWriter::AddVectors(const std::string& name, const std::vector<uint8_t>& d
Status
SegmentWriter::Serialize() {
auto start = std::chrono::high_resolution_clock::now();
auto status = WriteBloomFilter();
if (!status.ok()) {
return status;
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s in total";
start = std::chrono::high_resolution_clock::now();
status = WriteVectors();
if (!status.ok()) {
return status;
}
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing vectors and uids took " << diff.count() << " s in total";
start = std::chrono::high_resolution_clock::now();
// Write an empty deleted doc
status = WriteDeletedDocs();
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing deleted docs took " << diff.count() << " s";
return status;
}
......@@ -81,15 +100,33 @@ SegmentWriter::WriteBloomFilter() {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
auto start = std::chrono::high_resolution_clock::now();
default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
ENGINE_LOG_DEBUG << "Initializing bloom filter took " << diff.count() << " s";
start = std::chrono::high_resolution_clock::now();
auto& uids = segment_ptr_->vectors_ptr_->GetUids();
for (auto& uid : uids) {
auto status = segment_ptr_->id_bloom_filter_ptr_->Add(uid);
if (!status.ok()) {
return status;
}
segment_ptr_->id_bloom_filter_ptr_->Add(uid);
}
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Adding " << uids.size() << " ids to bloom filter took " << diff.count() << " s";
start = std::chrono::high_resolution_clock::now();
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
end = std::chrono::high_resolution_clock::now();
diff = end - start;
ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s";
} catch (Exception& e) {
std::string err_msg = "Failed to write vectors. " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册