From 8c7ef47f205edfed7d350ca5cd45b4d0660cdd98 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Thu, 5 Mar 2020 12:38:22 +0800 Subject: [PATCH] Fix duplicate vector ID issue (#1508) * #1499 Fix duplicated ID number issue Signed-off-by: jinhai * #1499 Fix duplicated ID number issue - part 1 Signed-off-by: JinHai-CN * #1499 Fix duplicated ID number issue - part 2 Signed-off-by: JinHai-CN * #1499 Fix duplicated ID number issue - part 3 Signed-off-by: jinhai * #1499 Fix duplicated ID number issue - part 4 Signed-off-by: jinhai * Fix format issue Signed-off-by: JinHai-CN * Update CHANGELOG Signed-off-by: JinHai-CN * Add return value check Signed-off-by: JinHai-CN --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 7 ++- core/src/db/IDGenerator.cpp | 73 +++++++++++++++++++++++++++-- core/src/db/IDGenerator.h | 35 ++++++++++++-- core/src/db/insert/VectorSource.cpp | 9 ++-- core/src/db/insert/VectorSource.h | 2 - core/src/db/meta/MySQLMetaImpl.cpp | 9 ++-- core/src/db/meta/SqliteMetaImpl.cpp | 8 ++-- core/unittest/db/test_db.cpp | 3 -- core/unittest/db/test_db_mysql.cpp | 3 -- core/unittest/db/test_misc.cpp | 17 +++++++ 11 files changed, 139 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 469a69c0..1ce07b82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Please mark all change in change log and use the issue from GitHub - \#1359 Negative distance value returned when searching with HNSW index type - \#1429 Server crashed when searching vectors using GPU - \#1484 Index type changed to IDMAP after compacted +- \#1499 Fix duplicated ID number issue - \#1491 Server crashed during adding vectors - \#1504 Avoid possible race condition between delete and search diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 1bcd54ac..1e515f07 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -484,8 +484,11 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_ // insert vectors into target table // (zhiru): generate ids if (vectors.id_array_.empty()) { - auto id_generator = std::make_shared(); - id_generator->GetNextIDNumbers(vectors.vector_count_, vectors.id_array_); + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_); + if (!status.ok()) { + return status; + } } Status status; diff --git a/core/src/db/IDGenerator.cpp b/core/src/db/IDGenerator.cpp index 6ee77378..0f8bd1f6 100644 --- a/core/src/db/IDGenerator.cpp +++ b/core/src/db/IDGenerator.cpp @@ -10,11 +10,13 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/IDGenerator.h" +#include "utils/Log.h" #include #include #include #include +#include namespace milvus { namespace engine { @@ -30,15 +32,15 @@ SimpleIDGenerator::GetNextIDNumber() { return micros * MAX_IDS_PER_MICRO; } -void +Status SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { if (n > MAX_IDS_PER_MICRO) { NextIDNumbers(n - MAX_IDS_PER_MICRO, ids); NextIDNumbers(MAX_IDS_PER_MICRO, ids); - return; + return Status::OK(); } if (n <= 0) { - return; + return Status::OK(); } auto now = std::chrono::system_clock::now(); @@ -48,12 +50,75 @@ SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { for (int pos = 0; pos < n; ++pos) { ids.push_back(micros + pos); } + return Status::OK(); } -void +Status SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) { ids.clear(); NextIDNumbers(n, ids); + + return Status::OK(); +} + +IDNumber +SafeIDGenerator::GetNextIDNumber() { + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); + std::lock_guard lock(mtx_); + if (micros <= time_stamp_ms_) { + time_stamp_ms_ += 1; + } else { + time_stamp_ms_ = micros; + } + return time_stamp_ms_ * MAX_IDS_PER_MICRO; +} + +Status +SafeIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) { + ids.clear(); + std::lock_guard lock(mtx_); + while (n > 0) { + if (n > MAX_IDS_PER_MICRO) { + Status status = NextIDNumbers(MAX_IDS_PER_MICRO, ids); + if (!status.ok()) { + return status; + } + n -= MAX_IDS_PER_MICRO; + } else { + Status status = NextIDNumbers(n, ids); + if (!status.ok()) { + return status; + } + break; + } + } + return Status::OK(); +} + +Status +SafeIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { + if (n <= 0 || n > MAX_IDS_PER_MICRO) { + std::string msg = "Invalid ID number: " + std::to_string(n); + ENGINE_LOG_ERROR << msg; + return Status(SERVER_UNEXPECTED_ERROR, msg); + } + + auto now = std::chrono::system_clock::now(); + int64_t micros = std::chrono::duration_cast(now.time_since_epoch()).count(); + if (micros <= time_stamp_ms_) { + time_stamp_ms_ += 1; + } else { + time_stamp_ms_ = micros; + } + + int64_t ID_high_part = time_stamp_ms_ * MAX_IDS_PER_MICRO; + + for (int pos = 0; pos < n; ++pos) { + ids.push_back(ID_high_part + pos); + } + + return Status::OK(); } } // namespace engine diff --git a/core/src/db/IDGenerator.h b/core/src/db/IDGenerator.h index a9f6e6a0..fd59fc33 100644 --- a/core/src/db/IDGenerator.h +++ b/core/src/db/IDGenerator.h @@ -12,6 +12,7 @@ #pragma once #include "Types.h" +#include "utils/Status.h" #include #include @@ -24,7 +25,7 @@ class IDGenerator { virtual IDNumber GetNextIDNumber() = 0; - virtual void + virtual Status GetNextIDNumbers(size_t n, IDNumbers& ids) = 0; virtual ~IDGenerator() = 0; @@ -37,15 +38,43 @@ class SimpleIDGenerator : public IDGenerator { IDNumber GetNextIDNumber() override; - void + Status GetNextIDNumbers(size_t n, IDNumbers& ids) override; private: - void + Status NextIDNumbers(size_t n, IDNumbers& ids); static constexpr size_t MAX_IDS_PER_MICRO = 1000; }; // SimpleIDGenerator +class SafeIDGenerator : public IDGenerator { + public: + static SafeIDGenerator& + GetInstance() { + static SafeIDGenerator instance; + return instance; + } + + ~SafeIDGenerator() override = default; + + IDNumber + GetNextIDNumber() override; + + Status + GetNextIDNumbers(size_t n, IDNumbers& ids) override; + + private: + SafeIDGenerator() = default; + + Status + NextIDNumbers(size_t n, IDNumbers& ids); + + static constexpr size_t MAX_IDS_PER_MICRO = 1000; + + std::mutex mtx_; + int64_t time_stamp_ms_ = 0; +}; + } // namespace engine } // namespace milvus diff --git a/core/src/db/insert/VectorSource.cpp b/core/src/db/insert/VectorSource.cpp index ce297bef..099eeed4 100644 --- a/core/src/db/insert/VectorSource.cpp +++ b/core/src/db/insert/VectorSource.cpp @@ -22,8 +22,7 @@ namespace milvus { namespace engine { -VectorSource::VectorSource(VectorsData vectors) - : vectors_(std::move(vectors)), id_generator_(std::make_shared()) { +VectorSource::VectorSource(VectorsData vectors) : vectors_(std::move(vectors)) { current_num_vectors_added = 0; } @@ -38,7 +37,11 @@ VectorSource::Add(/*const ExecutionEnginePtr& execution_engine,*/ const segment: current_num_vectors_added + num_vectors_to_add <= n ? num_vectors_to_add : n - current_num_vectors_added; IDNumbers vector_ids_to_add; if (vectors_.id_array_.empty()) { - id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + Status status = id_generator.GetNextIDNumbers(num_vectors_added, vector_ids_to_add); + if (!status.ok()) { + return status; + } } else { vector_ids_to_add.resize(num_vectors_added); for (size_t pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) { diff --git a/core/src/db/insert/VectorSource.h b/core/src/db/insert/VectorSource.h index fdc1bd7e..b92a87e3 100644 --- a/core/src/db/insert/VectorSource.h +++ b/core/src/db/insert/VectorSource.h @@ -49,8 +49,6 @@ class VectorSource { IDNumbers vector_ids_; size_t current_num_vectors_added; - - std::shared_ptr id_generator_; }; // VectorSource using VectorSourcePtr = std::shared_ptr; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 22aa173b..a80371c1 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -182,8 +182,8 @@ Status MySQLMetaImpl::NextTableId(std::string& table_id) { std::lock_guard lock(genid_mutex_); // avoid duplicated id std::stringstream ss; - SimpleIDGenerator g; - ss << g.GetNextIDNumber(); + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + ss << id_generator.GetNextIDNumber(); table_id = ss.str(); return Status::OK(); } @@ -192,8 +192,9 @@ Status MySQLMetaImpl::NextFileId(std::string& file_id) { std::lock_guard lock(genid_mutex_); // avoid duplicated id std::stringstream ss; - SimpleIDGenerator g; - ss << g.GetNextIDNumber(); + + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + ss << id_generator.GetNextIDNumber(); file_id = ss.str(); return Status::OK(); } diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 72b231b5..ba029a53 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -100,8 +100,8 @@ Status SqliteMetaImpl::NextTableId(std::string& table_id) { std::lock_guard lock(genid_mutex_); // avoid duplicated id std::stringstream ss; - SimpleIDGenerator g; - ss << g.GetNextIDNumber(); + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + ss << id_generator.GetNextIDNumber(); table_id = ss.str(); return Status::OK(); } @@ -110,8 +110,8 @@ Status SqliteMetaImpl::NextFileId(std::string& file_id) { std::lock_guard lock(genid_mutex_); // avoid duplicated id std::stringstream ss; - SimpleIDGenerator g; - ss << g.GetNextIDNumber(); + SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); + ss << id_generator.GetNextIDNumber(); file_id = ss.str(); return Status::OK(); } diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index d68c7127..003ad332 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -57,9 +57,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect vectors.id_array_.push_back(n * batch_index + i); } - - // milvus::engine::SimpleIDGenerator id_gen; - // id_gen.GetNextIDNumbers(n, vectors.id_array_); } std::string diff --git a/core/unittest/db/test_db_mysql.cpp b/core/unittest/db/test_db_mysql.cpp index 27fc8a9c..9ec1304c 100644 --- a/core/unittest/db/test_db_mysql.cpp +++ b/core/unittest/db/test_db_mysql.cpp @@ -50,9 +50,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect vectors.id_array_.push_back(n * batch_index + i); } - - // milvus::engine::SimpleIDGenerator id_gen; - // id_gen.GetNextIDNumbers(n, vectors.id_array_); } } // namespace diff --git a/core/unittest/db/test_misc.cpp b/core/unittest/db/test_misc.cpp index 0fc8d89b..870bd35a 100644 --- a/core/unittest/db/test_misc.cpp +++ b/core/unittest/db/test_misc.cpp @@ -15,6 +15,7 @@ #include #include +#include "db/IDGenerator.h" #include "db/IndexFailedChecker.h" #include "db/OngoingFileChecker.h" #include "db/Options.h" @@ -161,6 +162,22 @@ TEST(DBMiscTest, UTILS_TEST) { status = milvus::engine::utils::DeleteSegment(options, file); } +TEST(DBMiscTest, SAFE_ID_GENERATOR_TEST) { + milvus::engine::SafeIDGenerator& generator = milvus::engine::SafeIDGenerator::GetInstance(); + size_t n = 1000000; + milvus::engine::IDNumbers ids; + + milvus::Status status = generator.GetNextIDNumbers(n, ids); + ASSERT_TRUE(status.ok()); + + std::set unique_ids; + for (size_t i = 0; i < ids.size(); i++) { + unique_ids.insert(ids[i]); + } + + ASSERT_EQ(ids.size(), unique_ids.size()); +} + TEST(DBMiscTest, CHECKER_TEST) { { milvus::engine::IndexFailedChecker checker; -- GitLab