From a724141e73ae05ff3375badfc9685da43c3de4b3 Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 30 Apr 2019 17:37:50 +0800 Subject: [PATCH] use multithreads to do id mapping Former-commit-id: 10b201d3801b146fa6335e4dfd20c877cfb2e948 --- cpp/src/server/VecServiceTask.cpp | 54 +++++++++++++++++++++++++------ cpp/src/server/VecServiceTask.h | 2 ++ cpp/src/utils/ThreadPool.h | 4 +-- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/cpp/src/server/VecServiceTask.cpp b/cpp/src/server/VecServiceTask.cpp index 8a778f02..888bc994 100644 --- a/cpp/src/server/VecServiceTask.cpp +++ b/cpp/src/server/VecServiceTask.cpp @@ -9,10 +9,12 @@ #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" +#include "utils/ThreadPool.h" #include "db/DB.h" #include "db/Env.h" #include "db/Meta.h" + namespace zilliz { namespace vecwise { namespace server { @@ -21,6 +23,7 @@ static const std::string DQL_TASK_GROUP = "dql"; static const std::string DDL_DML_TASK_GROUP = "ddl_dml"; static const std::string VECTOR_UID = "uid"; +static const uint64_t USE_MT = 5000; using DB_META = zilliz::vecwise::engine::meta::Meta; using DB_DATE = zilliz::vecwise::engine::meta::DateT; @@ -61,6 +64,11 @@ namespace { CommonUtil::ConvertTime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, t_t); return DB_META::GetDate(t_t); } + + ThreadPool& GetThreadPool() { + static ThreadPool pool(6); + return pool; + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -371,6 +379,19 @@ const AttribMap& AddBatchVectorTask::GetVecAttrib(uint64_t index) const { } } +void AddBatchVectorTask::ProcessIdMapping(engine::IDNumbers& vector_ids, uint64_t from, uint64_t to) { + std::string nid_prefix = group_id_ + "_"; + for(size_t i = from; i < to; i++) { + std::string uid = GetVecID(i); + std::string nid = nid_prefix + std::to_string(vector_ids[i]); + AttribMap attrib = GetVecAttrib(i); + attrib[VECTOR_UID] = uid; + std::string attrib_str; + AttributeSerializer::Encode(attrib, attrib_str); + IVecIdMapper::GetInstance()->Put(nid, attrib_str); + } +} + ServerError AddBatchVectorTask::OnExecute() { try { TimeRecorder rc("AddBatchVectorTask"); @@ -415,17 +436,30 @@ ServerError AddBatchVectorTask::OnExecute() { SERVER_LOG_ERROR << "Vector ID not returned"; return SERVER_UNEXPECTED_ERROR; } else { - std::string nid_prefix = group_id_ + "_"; - for(size_t i = 0; i < vec_count; i++) { - std::string uid = GetVecID(i); - std::string nid = nid_prefix + std::to_string(vector_ids[i]); - AttribMap attrib = GetVecAttrib(i); - attrib[VECTOR_UID] = uid; - std::string attrib_str; - AttributeSerializer::Encode(attrib, attrib_str); - IVecIdMapper::GetInstance()->Put(nid, attrib_str); + if(vec_count < USE_MT) { + ProcessIdMapping(vector_ids, 0, vec_count); + rc.Record("built id mapping"); + } else { + std::list> threads_list; + + uint64_t begin_index = 0, end_index = USE_MT; + while(end_index < vec_count) { + threads_list.push_back( + GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping, + this, vector_ids, begin_index, end_index)); + begin_index = end_index; + end_index += USE_MT; + if(end_index > vec_count) { + end_index = vec_count; + } + } + + for (std::list>::iterator it = threads_list.begin(); it != threads_list.end(); it++) { + it->wait(); + } + + rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size())); } - rc.Record("build id mapping"); } } diff --git a/cpp/src/server/VecServiceTask.h b/cpp/src/server/VecServiceTask.h index 14afa3d0..f3da85ee 100644 --- a/cpp/src/server/VecServiceTask.h +++ b/cpp/src/server/VecServiceTask.h @@ -119,6 +119,8 @@ protected: std::string GetVecID(uint64_t index) const; const AttribMap& GetVecAttrib(uint64_t index) const; + void ProcessIdMapping(engine::IDNumbers& vector_ids, uint64_t from, uint64_t to); + ServerError OnExecute() override; private: diff --git a/cpp/src/utils/ThreadPool.h b/cpp/src/utils/ThreadPool.h index 04439ba7..13d877bb 100644 --- a/cpp/src/utils/ThreadPool.h +++ b/cpp/src/utils/ThreadPool.h @@ -19,8 +19,8 @@ #define MAX_THREADS_NUM 32 namespace zilliz { -namespace sql { -namespace storage { +namespace vecwise { +namespace server { class ThreadPool { public: -- GitLab