diff --git a/CHANGELOG.md b/CHANGELOG.md index b478f7fa14fe755f69ca4bfd7c6a691634c7a3a0..b7983f28365b6a2dfc8cb59d4be1c8b332abaac7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Please mark all change in change log and use the issue from GitHub - \#815 - Support MinIO storage - \#823 - Support binary vector tanimoto/jaccard/hamming metric - \#853 - Support HNSW +- \#861 - Support DeleteById / SearchByID / GetVectorById / Flush - \#910 - Change Milvus c++ standard to c++17 - \#1122 - Support AVX-512 in FAISS - \#1204 - Add api to get table data information @@ -67,7 +68,7 @@ Please mark all change in change log and use the issue from GitHub - \#1234 - Do S3 server validation check when Milvus startup - \#1263 - Allow system conf modifiable and some take effect directly - \#1320 - Remove debug logging from faiss - +- \#1444 - Improve delete ## Task - \#1327 - Exclude third-party code from codebeat diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 76f0edd7631c39e03565ceae07020d7eb9051286..774d8ea96efc85c4e1c46c607e541db90ecd13ab 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -17,6 +17,9 @@ #include "codecs/default/DefaultDeletedDocsFormat.h" +#include +#include + #include #include #include @@ -35,8 +38,9 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment std::string dir_path = directory_ptr->GetDirPath(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; - FILE* del_file = fopen(del_file_path.c_str(), "rb"); - if (del_file == nullptr) { + + int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); + if (del_fd == -1) { std::string err_msg = "Failed to open file: " + del_file_path; ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); @@ -46,9 +50,20 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment auto deleted_docs_size = file_size / sizeof(segment::offset_t); std::vector deleted_docs_list; deleted_docs_list.resize(deleted_docs_size); - fread((void*)(deleted_docs_list.data()), sizeof(segment::offset_t), deleted_docs_size, del_file); + + if (::read(del_fd, deleted_docs_list.data(), file_size) == -1) { + std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + deleted_docs = std::make_shared(deleted_docs_list); - fclose(del_file); + + if (::close(del_fd) == -1) { + std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } } void @@ -57,16 +72,28 @@ DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const std::string dir_path = directory_ptr->GetDirPath(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; - FILE* del_file = fopen(del_file_path.c_str(), "ab"); // TODO(zhiru): append mode - if (del_file == nullptr) { + + // TODO(zhiru): append mode + int del_fd = open(del_file_path.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664); + if (del_fd == -1) { std::string err_msg = "Failed to open file: " + del_file_path; ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } auto deleted_docs_list = deleted_docs->GetDeletedDocs(); - fwrite((void*)(deleted_docs_list.data()), sizeof(segment::offset_t), deleted_docs->GetSize(), del_file); - fclose(del_file); + + if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) { + std::string err_msg = "Failed to write to file" + del_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + if (::close(del_fd) == -1) { + std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno); + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } } } // namespace codec diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index 215d7bc52097a6d0ade83ef8ff6896d1dadfc5df..f0388e4d4d9676c303bbf67f93dd2d65c49134cc 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -144,6 +144,8 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } + auto start = std::chrono::high_resolution_clock::now(); + if (::write(rv_fd, vectors->GetData().data(), vectors->GetData().size()) == -1) { std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; @@ -155,6 +157,11 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm throw Exception(SERVER_WRITE_ERROR, err_msg); } + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + ENGINE_LOG_DEBUG << "Writing raw vectors took " << diff.count() << " s"; + + start = std::chrono::high_resolution_clock::now(); if (::write(uid_fd, vectors->GetUids().data(), sizeof(segment::doc_id_t) * vectors->GetCount()) == -1) { std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno); ENGINE_LOG_ERROR << err_msg; @@ -165,6 +172,9 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm ENGINE_LOG_ERROR << err_msg; throw Exception(SERVER_WRITE_ERROR, err_msg); } + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + ENGINE_LOG_DEBUG << "Writing uids took " << diff.count() << " s"; } void diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index 1bfbdab1b9794e18c0224b01226674e765749a6a..a644a979cdee4f530941e83706f497231e23e354 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -38,10 +38,10 @@ class MemManager { DeleteVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0; virtual Status - Flush(const std::string& table_id) = 0; + Flush(const std::string& table_id, bool apply_delete = true) = 0; virtual Status - Flush(std::set& table_ids) = 0; + Flush(std::set& table_ids, bool apply_delete = true) = 0; // virtual Status // Serialize(std::set& table_ids) = 0; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 16e2ff272f10e3f201fc4e20f44e95e912a083c0..287ffc1654b15fe1f9a9428223218d9ff1d2ea27 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -37,7 +37,8 @@ MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const flushed_tables.clear(); if (GetCurrentMem() > options_.insert_buffer_size_) { ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush"; - auto status = Flush(flushed_tables); + // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge + auto status = Flush(flushed_tables, false); if (!status.ok()) { return status; } @@ -62,7 +63,8 @@ MemManagerImpl::InsertVectors(const std::string& table_id, int64_t length, const flushed_tables.clear(); if (GetCurrentMem() > options_.insert_buffer_size_) { ENGINE_LOG_DEBUG << "Insert buffer size exceeds limit. Performing force flush"; - auto status = Flush(flushed_tables); + // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge + auto status = Flush(flushed_tables, false); if (!status.ok()) { return status; } @@ -126,7 +128,7 @@ MemManagerImpl::DeleteVectors(const std::string& table_id, int64_t length, const } Status -MemManagerImpl::Flush(const std::string& table_id) { +MemManagerImpl::Flush(const std::string& table_id, bool apply_delete) { ToImmutable(table_id); // TODO: There is actually only one memTable in the immutable list MemList temp_immutable_list; @@ -139,7 +141,7 @@ MemManagerImpl::Flush(const std::string& table_id) { auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn); + auto status = mem->Serialize(max_lsn, apply_delete); if (!status.ok()) { ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed"; return status; @@ -151,7 +153,7 @@ MemManagerImpl::Flush(const std::string& table_id) { } Status -MemManagerImpl::Flush(std::set& table_ids) { +MemManagerImpl::Flush(std::set& table_ids, bool apply_delete) { ToImmutable(); MemList temp_immutable_list; @@ -165,7 +167,7 @@ MemManagerImpl::Flush(std::set& table_ids) { auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { ENGINE_LOG_DEBUG << "Flushing table: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn); + auto status = mem->Serialize(max_lsn, apply_delete); if (!status.ok()) { ENGINE_LOG_ERROR << "Flush table " << mem->GetTableId() << " failed"; return status; diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index 7bda7e51551a863c6c0d163c5c4a608d1b49279a..7e749f76c0be8cbec76d91c7f3c5c1bf27c9ea40 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -72,10 +72,10 @@ class MemManagerImpl : public MemManager { DeleteVectors(const std::string& table_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override; Status - Flush(const std::string& table_id) override; + Flush(const std::string& table_id, bool apply_delete = true) override; Status - Flush(std::set& table_ids) override; + Flush(std::set& table_ids, bool apply_delete = true) override; // Status // Serialize(std::set& table_ids) override; diff --git a/core/src/db/insert/MemTable.cpp b/core/src/db/insert/MemTable.cpp index a2b7b226c692fced3f542db375de52e3ad9f1ff4..7656683f56db869f83f0053a926693fb03c18291 100644 --- a/core/src/db/insert/MemTable.cpp +++ b/core/src/db/insert/MemTable.cpp @@ -97,10 +97,10 @@ MemTable::GetTableFileCount() { } Status -MemTable::Serialize(uint64_t wal_lsn) { +MemTable::Serialize(uint64_t wal_lsn, bool apply_delete) { auto start = std::chrono::high_resolution_clock::now(); - if (!doc_ids_to_delete_.empty()) { + if (!doc_ids_to_delete_.empty() && apply_delete) { auto status = ApplyDeletes(); if (!status.ok()) { return Status(DB_ERROR, status.message()); diff --git a/core/src/db/insert/MemTable.h b/core/src/db/insert/MemTable.h index 11b2a5739542f8cf35f88863bf0c331f6465e409..56ae2299e501800e3a589d453a36bc7292875235 100644 --- a/core/src/db/insert/MemTable.h +++ b/core/src/db/insert/MemTable.h @@ -47,7 +47,7 @@ class MemTable { GetTableFileCount(); Status - Serialize(uint64_t wal_lsn); + Serialize(uint64_t wal_lsn, bool apply_delete = true); bool Empty(); diff --git a/core/src/db/insert/MemTableFile.cpp b/core/src/db/insert/MemTableFile.cpp index 522be43171534584c05ffc776da37e95effdeb4a..f5ca3672f9dec51213cf62f513e950873554a47f 100644 --- a/core/src/db/insert/MemTableFile.cpp +++ b/core/src/db/insert/MemTableFile.cpp @@ -101,8 +101,26 @@ Status MemTableFile::Delete(const std::vector& doc_ids) { segment::SegmentPtr segment_ptr; segment_writer_ptr_->GetSegment(segment_ptr); + // Check wither the doc_id is present, if yes, delete it's corresponding buffer + + std::vector temp; + temp.resize(doc_ids.size()); + memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t)); + + std::sort(temp.begin(), temp.end()); + auto uids = segment_ptr->vectors_ptr_->GetUids(); + + size_t deleted = 0; + size_t loop = uids.size(); + for (size_t i = 0; i < loop; ++i) { + if (std::binary_search(temp.begin(), temp.end(), uids[i])) { + segment_ptr->vectors_ptr_->Erase(i - deleted); + ++deleted; + } + } + /* for (auto& doc_id : doc_ids) { auto found = std::find(uids.begin(), uids.end(), doc_id); if (found != uids.end()) { @@ -111,6 +129,7 @@ MemTableFile::Delete(const std::vector& doc_ids) { uids = segment_ptr->vectors_ptr_->GetUids(); } } + */ return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 0a9a57c5472cc9e51398506168b4c4c463c2d70a..fbb6cae707f9d4d8e785f169938064808c25d57b 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -47,18 +47,37 @@ SegmentWriter::AddVectors(const std::string& name, const std::vector& d Status SegmentWriter::Serialize() { + auto start = std::chrono::high_resolution_clock::now(); + auto status = WriteBloomFilter(); if (!status.ok()) { return status; } + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s in total"; + + start = std::chrono::high_resolution_clock::now(); + status = WriteVectors(); if (!status.ok()) { return status; } + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + ENGINE_LOG_DEBUG << "Writing vectors and uids took " << diff.count() << " s in total"; + + start = std::chrono::high_resolution_clock::now(); + // Write an empty deleted doc status = WriteDeletedDocs(); + + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + ENGINE_LOG_DEBUG << "Writing deleted docs took " << diff.count() << " s"; + return status; } @@ -81,15 +100,33 @@ SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; try { directory_ptr_->Create(); + + auto start = std::chrono::high_resolution_clock::now(); + default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + ENGINE_LOG_DEBUG << "Initializing bloom filter took " << diff.count() << " s"; + + start = std::chrono::high_resolution_clock::now(); + auto& uids = segment_ptr_->vectors_ptr_->GetUids(); for (auto& uid : uids) { - auto status = segment_ptr_->id_bloom_filter_ptr_->Add(uid); - if (!status.ok()) { - return status; - } + segment_ptr_->id_bloom_filter_ptr_->Add(uid); } + + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + ENGINE_LOG_DEBUG << "Adding " << uids.size() << " ids to bloom filter took " << diff.count() << " s"; + + start = std::chrono::high_resolution_clock::now(); + default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + ENGINE_LOG_DEBUG << "Writing bloom filter took " << diff.count() << " s"; } catch (Exception& e) { std::string err_msg = "Failed to write vectors. " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg;