提交 a724141e 编写于 作者: G groot

use multithreads to do id mapping


Former-commit-id: 10b201d3801b146fa6335e4dfd20c877cfb2e948
上级 b67c731b
......@@ -9,10 +9,12 @@
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ThreadPool.h"
#include "db/DB.h"
#include "db/Env.h"
#include "db/Meta.h"
namespace zilliz {
namespace vecwise {
namespace server {
......@@ -21,6 +23,7 @@ static const std::string DQL_TASK_GROUP = "dql";
static const std::string DDL_DML_TASK_GROUP = "ddl_dml";
static const std::string VECTOR_UID = "uid";
static const uint64_t USE_MT = 5000;
using DB_META = zilliz::vecwise::engine::meta::Meta;
using DB_DATE = zilliz::vecwise::engine::meta::DateT;
......@@ -61,6 +64,11 @@ namespace {
CommonUtil::ConvertTime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, t_t);
return DB_META::GetDate(t_t);
}
ThreadPool& GetThreadPool() {
static ThreadPool pool(6);
return pool;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -371,6 +379,19 @@ const AttribMap& AddBatchVectorTask::GetVecAttrib(uint64_t index) const {
}
}
void AddBatchVectorTask::ProcessIdMapping(engine::IDNumbers& vector_ids, uint64_t from, uint64_t to) {
std::string nid_prefix = group_id_ + "_";
for(size_t i = from; i < to; i++) {
std::string uid = GetVecID(i);
std::string nid = nid_prefix + std::to_string(vector_ids[i]);
AttribMap attrib = GetVecAttrib(i);
attrib[VECTOR_UID] = uid;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
}
}
ServerError AddBatchVectorTask::OnExecute() {
try {
TimeRecorder rc("AddBatchVectorTask");
......@@ -415,17 +436,30 @@ ServerError AddBatchVectorTask::OnExecute() {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
} else {
std::string nid_prefix = group_id_ + "_";
for(size_t i = 0; i < vec_count; i++) {
std::string uid = GetVecID(i);
std::string nid = nid_prefix + std::to_string(vector_ids[i]);
AttribMap attrib = GetVecAttrib(i);
attrib[VECTOR_UID] = uid;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count);
rc.Record("built id mapping");
} else {
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
}
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
}
rc.Record("build id mapping");
}
}
......
......@@ -119,6 +119,8 @@ protected:
std::string GetVecID(uint64_t index) const;
const AttribMap& GetVecAttrib(uint64_t index) const;
void ProcessIdMapping(engine::IDNumbers& vector_ids, uint64_t from, uint64_t to);
ServerError OnExecute() override;
private:
......
......@@ -19,8 +19,8 @@
#define MAX_THREADS_NUM 32
namespace zilliz {
namespace sql {
namespace storage {
namespace vecwise {
namespace server {
class ThreadPool {
public:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册