未验证 提交 8c7ef47f 编写于 作者: J Jin Hai 提交者: GitHub

Fix duplicate vector ID issue (#1508)

* #1499 Fix duplicated ID number issue
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* #1499 Fix duplicated ID number issue - part 1
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* #1499 Fix duplicated ID number issue - part 2
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* #1499 Fix duplicated ID number issue - part 3
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* #1499 Fix duplicated ID number issue - part 4
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* Fix format issue
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* Update CHANGELOG
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* Add return value check
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>
上级 9899cfef
...@@ -29,6 +29,7 @@ Please mark all change in change log and use the issue from GitHub ...@@ -29,6 +29,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1359 Negative distance value returned when searching with HNSW index type - \#1359 Negative distance value returned when searching with HNSW index type
- \#1429 Server crashed when searching vectors using GPU - \#1429 Server crashed when searching vectors using GPU
- \#1484 Index type changed to IDMAP after compacted - \#1484 Index type changed to IDMAP after compacted
- \#1499 Fix duplicated ID number issue
- \#1491 Server crashed during adding vectors - \#1491 Server crashed during adding vectors
- \#1504 Avoid possible race condition between delete and search - \#1504 Avoid possible race condition between delete and search
......
...@@ -484,8 +484,11 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_ ...@@ -484,8 +484,11 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_
// insert vectors into target table // insert vectors into target table
// (zhiru): generate ids // (zhiru): generate ids
if (vectors.id_array_.empty()) { if (vectors.id_array_.empty()) {
auto id_generator = std::make_shared<SimpleIDGenerator>(); SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
id_generator->GetNextIDNumbers(vectors.vector_count_, vectors.id_array_); Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
if (!status.ok()) {
return status;
}
} }
Status status; Status status;
......
...@@ -10,11 +10,13 @@ ...@@ -10,11 +10,13 @@
// or implied. See the License for the specific language governing permissions and limitations under the License. // or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/IDGenerator.h" #include "db/IDGenerator.h"
#include "utils/Log.h"
#include <assert.h> #include <assert.h>
#include <fiu-local.h> #include <fiu-local.h>
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <string>
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -30,15 +32,15 @@ SimpleIDGenerator::GetNextIDNumber() { ...@@ -30,15 +32,15 @@ SimpleIDGenerator::GetNextIDNumber() {
return micros * MAX_IDS_PER_MICRO; return micros * MAX_IDS_PER_MICRO;
} }
void Status
SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
if (n > MAX_IDS_PER_MICRO) { if (n > MAX_IDS_PER_MICRO) {
NextIDNumbers(n - MAX_IDS_PER_MICRO, ids); NextIDNumbers(n - MAX_IDS_PER_MICRO, ids);
NextIDNumbers(MAX_IDS_PER_MICRO, ids); NextIDNumbers(MAX_IDS_PER_MICRO, ids);
return; return Status::OK();
} }
if (n <= 0) { if (n <= 0) {
return; return Status::OK();
} }
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
...@@ -48,12 +50,75 @@ SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { ...@@ -48,12 +50,75 @@ SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
for (int pos = 0; pos < n; ++pos) { for (int pos = 0; pos < n; ++pos) {
ids.push_back(micros + pos); ids.push_back(micros + pos);
} }
return Status::OK();
} }
void Status
SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) { SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
ids.clear(); ids.clear();
NextIDNumbers(n, ids); NextIDNumbers(n, ids);
return Status::OK();
}
IDNumber
SafeIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
std::lock_guard<std::mutex> lock(mtx_);
if (micros <= time_stamp_ms_) {
time_stamp_ms_ += 1;
} else {
time_stamp_ms_ = micros;
}
return time_stamp_ms_ * MAX_IDS_PER_MICRO;
}
Status
SafeIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
ids.clear();
std::lock_guard<std::mutex> lock(mtx_);
while (n > 0) {
if (n > MAX_IDS_PER_MICRO) {
Status status = NextIDNumbers(MAX_IDS_PER_MICRO, ids);
if (!status.ok()) {
return status;
}
n -= MAX_IDS_PER_MICRO;
} else {
Status status = NextIDNumbers(n, ids);
if (!status.ok()) {
return status;
}
break;
}
}
return Status::OK();
}
Status
SafeIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
if (n <= 0 || n > MAX_IDS_PER_MICRO) {
std::string msg = "Invalid ID number: " + std::to_string(n);
ENGINE_LOG_ERROR << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
auto now = std::chrono::system_clock::now();
int64_t micros = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
if (micros <= time_stamp_ms_) {
time_stamp_ms_ += 1;
} else {
time_stamp_ms_ = micros;
}
int64_t ID_high_part = time_stamp_ms_ * MAX_IDS_PER_MICRO;
for (int pos = 0; pos < n; ++pos) {
ids.push_back(ID_high_part + pos);
}
return Status::OK();
} }
} // namespace engine } // namespace engine
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#pragma once #pragma once
#include "Types.h" #include "Types.h"
#include "utils/Status.h"
#include <cstddef> #include <cstddef>
#include <vector> #include <vector>
...@@ -24,7 +25,7 @@ class IDGenerator { ...@@ -24,7 +25,7 @@ class IDGenerator {
virtual IDNumber virtual IDNumber
GetNextIDNumber() = 0; GetNextIDNumber() = 0;
virtual void virtual Status
GetNextIDNumbers(size_t n, IDNumbers& ids) = 0; GetNextIDNumbers(size_t n, IDNumbers& ids) = 0;
virtual ~IDGenerator() = 0; virtual ~IDGenerator() = 0;
...@@ -37,15 +38,43 @@ class SimpleIDGenerator : public IDGenerator { ...@@ -37,15 +38,43 @@ class SimpleIDGenerator : public IDGenerator {
IDNumber IDNumber
GetNextIDNumber() override; GetNextIDNumber() override;
void Status
GetNextIDNumbers(size_t n, IDNumbers& ids) override; GetNextIDNumbers(size_t n, IDNumbers& ids) override;
private: private:
void Status
NextIDNumbers(size_t n, IDNumbers& ids); NextIDNumbers(size_t n, IDNumbers& ids);
static constexpr size_t MAX_IDS_PER_MICRO = 1000; static constexpr size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator }; // SimpleIDGenerator
class SafeIDGenerator : public IDGenerator {
public:
static SafeIDGenerator&
GetInstance() {
static SafeIDGenerator instance;
return instance;
}
~SafeIDGenerator() override = default;
IDNumber
GetNextIDNumber() override;
Status
GetNextIDNumbers(size_t n, IDNumbers& ids) override;
private:
SafeIDGenerator() = default;
Status
NextIDNumbers(size_t n, IDNumbers& ids);
static constexpr size_t MAX_IDS_PER_MICRO = 1000;
std::mutex mtx_;
int64_t time_stamp_ms_ = 0;
};
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
...@@ -22,8 +22,7 @@ ...@@ -22,8 +22,7 @@
namespace milvus { namespace milvus {
namespace engine { namespace engine {
VectorSource::VectorSource(VectorsData vectors) VectorSource::VectorSource(VectorsData vectors) : vectors_(std::move(vectors)) {
: vectors_(std::move(vectors)), id_generator_(std::make_shared<SimpleIDGenerator>()) {
current_num_vectors_added = 0; current_num_vectors_added = 0;
} }
...@@ -38,7 +37,11 @@ VectorSource::Add(/*const ExecutionEnginePtr& execution_engine,*/ const segment: ...@@ -38,7 +37,11 @@ VectorSource::Add(/*const ExecutionEnginePtr& execution_engine,*/ const segment:
current_num_vectors_added + num_vectors_to_add <= n ? num_vectors_to_add : n - current_num_vectors_added; current_num_vectors_added + num_vectors_to_add <= n ? num_vectors_to_add : n - current_num_vectors_added;
IDNumbers vector_ids_to_add; IDNumbers vector_ids_to_add;
if (vectors_.id_array_.empty()) { if (vectors_.id_array_.empty()) {
id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
Status status = id_generator.GetNextIDNumbers(num_vectors_added, vector_ids_to_add);
if (!status.ok()) {
return status;
}
} else { } else {
vector_ids_to_add.resize(num_vectors_added); vector_ids_to_add.resize(num_vectors_added);
for (size_t pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) { for (size_t pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) {
......
...@@ -49,8 +49,6 @@ class VectorSource { ...@@ -49,8 +49,6 @@ class VectorSource {
IDNumbers vector_ids_; IDNumbers vector_ids_;
size_t current_num_vectors_added; size_t current_num_vectors_added;
std::shared_ptr<IDGenerator> id_generator_;
}; // VectorSource }; // VectorSource
using VectorSourcePtr = std::shared_ptr<VectorSource>; using VectorSourcePtr = std::shared_ptr<VectorSource>;
......
...@@ -182,8 +182,8 @@ Status ...@@ -182,8 +182,8 @@ Status
MySQLMetaImpl::NextTableId(std::string& table_id) { MySQLMetaImpl::NextTableId(std::string& table_id) {
std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g; SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
ss << g.GetNextIDNumber(); ss << id_generator.GetNextIDNumber();
table_id = ss.str(); table_id = ss.str();
return Status::OK(); return Status::OK();
} }
...@@ -192,8 +192,9 @@ Status ...@@ -192,8 +192,9 @@ Status
MySQLMetaImpl::NextFileId(std::string& file_id) { MySQLMetaImpl::NextFileId(std::string& file_id) {
std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber(); SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
ss << id_generator.GetNextIDNumber();
file_id = ss.str(); file_id = ss.str();
return Status::OK(); return Status::OK();
} }
......
...@@ -100,8 +100,8 @@ Status ...@@ -100,8 +100,8 @@ Status
SqliteMetaImpl::NextTableId(std::string& table_id) { SqliteMetaImpl::NextTableId(std::string& table_id) {
std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g; SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
ss << g.GetNextIDNumber(); ss << id_generator.GetNextIDNumber();
table_id = ss.str(); table_id = ss.str();
return Status::OK(); return Status::OK();
} }
...@@ -110,8 +110,8 @@ Status ...@@ -110,8 +110,8 @@ Status
SqliteMetaImpl::NextFileId(std::string& file_id) { SqliteMetaImpl::NextFileId(std::string& file_id) {
std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id std::lock_guard<std::mutex> lock(genid_mutex_); // avoid duplicated id
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g; SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
ss << g.GetNextIDNumber(); ss << id_generator.GetNextIDNumber();
file_id = ss.str(); file_id = ss.str();
return Status::OK(); return Status::OK();
} }
......
...@@ -57,9 +57,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect ...@@ -57,9 +57,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect
vectors.id_array_.push_back(n * batch_index + i); vectors.id_array_.push_back(n * batch_index + i);
} }
// milvus::engine::SimpleIDGenerator id_gen;
// id_gen.GetNextIDNumbers(n, vectors.id_array_);
} }
std::string std::string
......
...@@ -50,9 +50,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect ...@@ -50,9 +50,6 @@ BuildVectors(uint64_t n, uint64_t batch_index, milvus::engine::VectorsData& vect
vectors.id_array_.push_back(n * batch_index + i); vectors.id_array_.push_back(n * batch_index + i);
} }
// milvus::engine::SimpleIDGenerator id_gen;
// id_gen.GetNextIDNumbers(n, vectors.id_array_);
} }
} // namespace } // namespace
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "db/IDGenerator.h"
#include "db/IndexFailedChecker.h" #include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h" #include "db/OngoingFileChecker.h"
#include "db/Options.h" #include "db/Options.h"
...@@ -161,6 +162,22 @@ TEST(DBMiscTest, UTILS_TEST) { ...@@ -161,6 +162,22 @@ TEST(DBMiscTest, UTILS_TEST) {
status = milvus::engine::utils::DeleteSegment(options, file); status = milvus::engine::utils::DeleteSegment(options, file);
} }
TEST(DBMiscTest, SAFE_ID_GENERATOR_TEST) {
milvus::engine::SafeIDGenerator& generator = milvus::engine::SafeIDGenerator::GetInstance();
size_t n = 1000000;
milvus::engine::IDNumbers ids;
milvus::Status status = generator.GetNextIDNumbers(n, ids);
ASSERT_TRUE(status.ok());
std::set<int64_t> unique_ids;
for (size_t i = 0; i < ids.size(); i++) {
unique_ids.insert(ids[i]);
}
ASSERT_EQ(ids.size(), unique_ids.size());
}
TEST(DBMiscTest, CHECKER_TEST) { TEST(DBMiscTest, CHECKER_TEST) {
{ {
milvus::engine::IndexFailedChecker checker; milvus::engine::IndexFailedChecker checker;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册