From d7fff5084a7754284b5b0d2dc942ac217b692f79 Mon Sep 17 00:00:00 2001 From: groot Date: Mon, 29 Jun 2020 10:07:47 +0800 Subject: [PATCH] merge #2659 from 0.10.1 (#2666) * merge #2659 from 0.10.1 Signed-off-by: yhmo * merge #2657 from 0.10.1 Signed-off-by: yhmo * format Signed-off-by: yhmo * typo Signed-off-by: yhmo --- CHANGELOG.md | 6 +-- core/src/db/DBImpl.cpp | 53 ++++++++++++--------------- core/src/db/insert/MemManager.h | 11 +++--- core/src/db/insert/MemManagerImpl.cpp | 48 ++++-------------------- core/src/db/insert/MemManagerImpl.h | 11 +++--- 5 files changed, 43 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68d0a7511..a1fbfbe04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,13 +8,13 @@ Please mark all changes in change log and use the issue from GitHub - \#2487 Remove timeout when creating collection in dev test - \#2532 Fix Milvus docker image report illegal instruction - \#2551 Fix test_hybrid_db and test_rpc error -- \#2557 fix random crash of INSERT_DUPLICATE_ID case +- \#2557 Fix random crash of INSERT_DUPLICATE_ID case - \#2578 Result count doesn't match target vectors count - \#2582 CreateHybridIndex.cpp compile error -- \#2598 fix Milvus docker image report illegal instruction +- \#2598 Fix Milvus docker image report illegal instruction - \#2617 Fix HNSW and RNSG index files size - \#2637 Suit the range of HNSW parameters -- \#2649 search parameter of annoy has conflict with document +- \#2649 Search parameter of annoy has conflict with document ## Feature - \#2319 Redo metadata to support MVCC diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 8df457bcc..6aff324be 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1024,6 +1024,10 @@ DBImpl::Flush(const std::string& collection_id) { if (lsn != 0) { swn_wal_.Notify(); flush_req_swn_.Wait(); + } else { + // no collection flushed, call merge task to cleanup files + std::set merge_collection_ids; + StartMergeTask(merge_collection_ids); } } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; @@ -1051,6 +1055,10 @@ DBImpl::Flush() { if (lsn != 0) { swn_wal_.Notify(); flush_req_swn_.Wait(); + } else { + // no collection flushed, call merge task to cleanup files + std::set merge_collection_ids; + StartMergeTask(merge_collection_ids); } } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; @@ -3015,7 +3023,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { auto collections_flushed = [&](const std::string collection_id, const std::set& target_collection_names) -> uint64_t { uint64_t max_lsn = 0; - if (options_.wal_enable_) { + if (options_.wal_enable_ && !target_collection_names.empty()) { uint64_t lsn = 0; for (auto& collection : target_collection_names) { meta_ptr_->GetCollectionFlushLSN(collection, lsn); @@ -3034,16 +3042,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return max_lsn; }; - auto partition_flushed = [&](const std::string& collection_id, const std::string& partition, - const std::string& target_collection_name) { - if (options_.wal_enable_) { - uint64_t lsn = 0; - meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn); - wal_mgr_->PartitionFlushed(collection_id, partition, lsn); + auto force_flush_if_mem_full = [&]() -> uint64_t { + if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) { + LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush"; + InternalFlush(); } - - std::set merge_collection_ids = {target_collection_name}; - StartMergeTask(merge_collection_ids); }; Status status; @@ -3057,15 +3060,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; - status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids, - (record.data_size / record.length / sizeof(float)), - (const float*)record.data, record.attr_nbytes, record.attr_data_size, - record.attr_data, record.lsn, flushed_collections); - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + status = mem_mgr_->InsertEntities( + target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)), + (const float*)record.data, record.attr_nbytes, record.attr_data_size, record.attr_data, record.lsn); + force_flush_if_mem_full(); + // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); break; } @@ -3077,14 +3077,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(uint8_t)), - (const u_int8_t*)record.data, record.lsn, flushed_collections); - // even though !status.ok, run - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + (const u_int8_t*)record.data, record.lsn); + force_flush_if_mem_full(); // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); @@ -3099,14 +3095,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)), - (const float*)record.data, record.lsn, flushed_collections); - // even though !status.ok, run - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + (const float*)record.data, record.lsn); + force_flush_if_mem_full(); // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); @@ -3220,6 +3212,7 @@ DBImpl::BackgroundWalThread() { next_auto_flush_time = get_next_auto_flush_time(); } + InternalFlush(); while (true) { if (options_.auto_flush_interval_ > 0) { if (std::chrono::system_clock::now() >= next_auto_flush_time) { diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index aeb5354ee..63b0c963f 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -27,18 +27,17 @@ class MemManager { public: virtual Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) = 0; + const float* vectors, uint64_t lsn) = 0; virtual Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) = 0; + const uint8_t* vectors, uint64_t lsn) = 0; virtual Status InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) = 0; + const std::unordered_map>& attr_data, uint64_t lsn) = 0; virtual Status DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) = 0; @@ -47,10 +46,10 @@ class MemManager { DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0; virtual Status - Flush(const std::string& collection_id, bool apply_delete = true) = 0; + Flush(const std::string& collection_id) = 0; virtual Status - Flush(std::set& collection_ids, bool apply_delete = true) = 0; + Flush(std::set& collection_ids) = 0; // virtual Status // Serialize(std::set& table_ids) = 0; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 20ab6c4d2..437a03130 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -34,17 +34,7 @@ MemManagerImpl::GetMemByTable(const std::string& collection_id) { Status MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge - auto status = Flush(flushed_tables, false); - fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - } - + const float* vectors, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.float_data_.resize(length * dim); @@ -60,19 +50,7 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, Status MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) - << "Insert buffer size exceeds limit. Performing force flush"; - // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge - auto status = Flush(flushed_tables, false); - if (!status.ok()) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message(); - return status; - } - } - + const uint8_t* vectors, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.binary_data_.resize(length * dim); @@ -91,19 +69,7 @@ MemManagerImpl::InsertEntities(const std::string& collection_id, int64_t length, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) - << "Insert buffer size exceeds limit. Performing force flush"; - auto status = Flush(flushed_tables, false); - if (!status.ok()) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message(); - return status; - } - } - + const std::unordered_map>& attr_data, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.float_data_.resize(length * dim); @@ -174,7 +140,7 @@ MemManagerImpl::DeleteVectors(const std::string& collection_id, int64_t length, } Status -MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { +MemManagerImpl::Flush(const std::string& collection_id) { ToImmutable(collection_id); // TODO: There is actually only one memTable in the immutable list MemList temp_immutable_list; @@ -187,7 +153,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn, apply_delete); + auto status = mem->Serialize(max_lsn, true); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed"; return status; @@ -199,7 +165,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { } Status -MemManagerImpl::Flush(std::set& collection_ids, bool apply_delete) { +MemManagerImpl::Flush(std::set& collection_ids) { ToImmutable(); MemList temp_immutable_list; @@ -213,7 +179,7 @@ MemManagerImpl::Flush(std::set& collection_ids, bool apply_delete) auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn, apply_delete); + auto status = mem->Serialize(max_lsn, true); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed"; return status; diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index 061dfe6bc..0eb957f43 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -43,18 +43,17 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler { Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) override; + const float* vectors, uint64_t lsn) override; Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) override; + const uint8_t* vectors, uint64_t lsn) override; Status InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) override; + const std::unordered_map>& attr_data, uint64_t lsn) override; Status DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) override; @@ -63,10 +62,10 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler { DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override; Status - Flush(const std::string& collection_id, bool apply_delete = true) override; + Flush(const std::string& collection_id) override; Status - Flush(std::set& collection_ids, bool apply_delete = true) override; + Flush(std::set& collection_ids) override; // Status // Serialize(std::set& table_ids) override; -- GitLab