提交 1eca6c99 编写于 作者: T TeslaZhao

add cube cache

上级 50730465
...@@ -61,11 +61,14 @@ message ResourceConf { ...@@ -61,11 +61,14 @@ message ResourceConf {
repeated string model_toolkit_file = 2; repeated string model_toolkit_file = 2;
repeated string general_model_path = 3; repeated string general_model_path = 3;
repeated string general_model_file = 4; repeated string general_model_file = 4;
optional string cube_config_path = 5;
optional string cube_config_file = 6; optional string cube_config_path = 10;
optional int32 cube_quant_bits = 7; // set 0 if no quant. optional string cube_config_file = 11;
optional string auth_product_name = 8; optional int32 cube_quant_bits = 12;
optional string auth_container_id = 9; optional string cube_cache_path = 13;
optional string auth_product_name = 20;
optional string auth_container_id = 21;
}; };
// DAG node depency info // DAG node depency info
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include "core/cube/cube-api/include/cube_api.h" #include "core/cube/cube-api/include/cube_api.h"
#include "core/predictor/framework/cache.h"
#include "core/predictor/framework/infer.h" #include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h" #include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h" #include "core/predictor/framework/resource.h"
...@@ -36,10 +37,11 @@ using baidu::paddle_serving::predictor::general_model::Response; ...@@ -36,10 +37,11 @@ using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
using baidu::paddle_serving::predictor::CubeCache;
// DistKV Infer Op: seek cube and then call paddle inference // DistKV Infer Op: seek cube and then call paddle inference
// op seq: general_reader-> dist_kv_infer -> general_response // op seq: general_reader-> dist_kv_infer -> general_response
int GeneralDistKVInferOp::inference() { int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names(); const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) { if (pre_node_names.size() != 1) {
...@@ -60,8 +62,8 @@ int GeneralDistKVInferOp::inference() { ...@@ -60,8 +62,8 @@ int GeneralDistKVInferOp::inference() {
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!output_blob) { if (!output_blob) {
LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error"; LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error";
return -1; return -1;
} }
output_blob->SetLogId(log_id); output_blob->SetLogId(log_id);
...@@ -76,18 +78,24 @@ int GeneralDistKVInferOp::inference() { ...@@ -76,18 +78,24 @@ int GeneralDistKVInferOp::inference() {
TensorVector *out = &output_blob->tensor_vector; TensorVector *out = &output_blob->tensor_vector;
std::vector<uint64_t> keys; std::vector<uint64_t> keys;
std::vector<uint64_t> unique_keys; std::vector<uint64_t> unique_keys;
std::unordered_map<uint64_t, rec::mcube::CubeValue*> key_map; std::unordered_map<uint64_t, rec::mcube::CubeValue *> key_map;
std::vector<rec::mcube::CubeValue> values; std::vector<rec::mcube::CubeValue> values;
int sparse_count = 0; // sparse inputs counts, sparse would seek cube // sparse inputs counts, sparse would seek cube
int dense_count = 0; // dense inputs counts, dense would directly call paddle infer int sparse_count = 0;
// dense inputs counts, dense would directly call paddle infer
int dense_count = 0;
std::vector<std::pair<int64_t *, size_t>> dataptr_size_pairs; std::vector<std::pair<int64_t *, size_t>> dataptr_size_pairs;
size_t key_len = 0; size_t key_len = 0;
for (size_t i = 0; i < in->size(); ++i) { for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) { if (in->at(i).dtype != paddle::PaddleDType::INT64) {
// dense input type is not int64
++dense_count; ++dense_count;
continue; continue;
} }
// sparse input type is int64
++sparse_count; ++sparse_count;
size_t elem_num = 1; size_t elem_num = 1;
for (size_t s = 0; s < in->at(i).shape.size(); ++s) { for (size_t s = 0; s < in->at(i).shape.size(); ++s) {
elem_num *= in->at(i).shape[s]; elem_num *= in->at(i).shape[s];
...@@ -107,33 +115,70 @@ int GeneralDistKVInferOp::inference() { ...@@ -107,33 +115,70 @@ int GeneralDistKVInferOp::inference() {
key_idx += dataptr_size_pairs[i].second; key_idx += dataptr_size_pairs[i].second;
} }
// filter dumplicate keys
int unique_keys_count = 0; int unique_keys_count = 0;
for (size_t i = 0; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); ++i) {
if (key_map.find(keys[i]) == key_map.end()) { if (key_map.find(keys[i]) == key_map.end()) {
key_map[keys[i]] = nullptr; key_map[keys[i]] = nullptr;
unique_keys[unique_keys_count++] = keys[i]; unique_keys[unique_keys_count++] = keys[i];
} }
} }
unique_keys.resize(unique_keys_count); unique_keys.resize(unique_keys_count);
VLOG(1) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len << " uniq keys: "<< unique_keys_count; VLOG(1) << "(logid=" << log_id
<< ") cube number of keys to look up: " << key_len
<< " uniq keys: " << unique_keys_count;
// fitler cache keys
size_t hit_counts = 0;
CubeCache *p_cube_cache =
InferManager::instance().get_cube_cache(engine_name().c_str());
if (p_cube_cache != nullptr) {
for (size_t i = 0; i < unique_keys_count; ++i) {
rec::mcube::CubeValue *hit_val = p_cube_cache->get_data(unique_keys[i]);
if (hit_val) {
LOG(WARNING) << "Hit one cache. key:" << unique_keys[i];
key_map[unique_keys[i]] = hit_val;
unique_keys[i] = 0;
++hit_counts;
}
}
} else {
LOG(WARNING) << "get cube cache fail. model: " << engine_name();
}
// clear unique keys which hit caches
if (hit_counts > 0) {
for (auto it = unique_keys.begin(); it < unique_keys.end();) {
if (*it == 0) {
it = unique_keys.erase(it);
--unique_keys_count;
} else {
++it;
}
}
}
LOG(WARNING) << "Hit " << hit_counts
<< " keys in cube cache, unique_keys:" << unique_keys.size();
// seek sparse params
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
std::vector<std::string> table_names = cube->get_table_names(); std::vector<std::string> table_names = cube->get_table_names();
if (table_names.size() == 0) { if (table_names.size() == 0) {
LOG(ERROR) << "cube init error or cube config not given."; LOG(ERROR) << "cube init error or cube config not given.";
return -1; return -1;
} }
int64_t seek_start = timeline.TimeStampUS(); int64_t seek_start = timeline.TimeStampUS();
int ret = cube->seek(table_names[0], unique_keys, &values); int ret = cube->seek(table_names[0], unique_keys, &values);
int64_t seek_end = timeline.TimeStampUS(); int64_t seek_end = timeline.TimeStampUS();
VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret << " seek_time: " << seek_end - seek_start; VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret
for (size_t i = 0; i < unique_keys.size(); ++i) { << " seek_time: " << seek_end - seek_start;
key_map[unique_keys[i]] = &values[i];
for (size_t i = 0; i < unique_keys.size(); ++i) {
key_map[unique_keys[i]] = &values[i];
} }
if (values.size() != keys.size() || values[0].buff.size() == 0) { if (values.size() != keys.size() || values[0].buff.size() == 0) {
LOG(ERROR) << "cube value return null"; LOG(ERROR) << "cube value return null";
} }
//size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); // size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
TensorVector sparse_out; TensorVector sparse_out;
sparse_out.resize(sparse_count); sparse_out.resize(sparse_count);
...@@ -145,16 +190,22 @@ int GeneralDistKVInferOp::inference() { ...@@ -145,16 +190,22 @@ int GeneralDistKVInferOp::inference() {
std::unordered_map<int, int> in_out_map; std::unordered_map<int, int> in_out_map;
baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance(); baidu::paddle_serving::predictor::Resource::instance();
std::shared_ptr<PaddleGeneralModelConfig> model_config = resource.get_general_model_config().front(); std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config().front();
int cube_key_found = 0; int cube_key_found = 0;
int cube_key_miss = 0; int cube_key_miss = 0;
for (size_t i = 0; i < in->size(); ++i) { for (size_t i = 0; i < in->size(); ++i) {
VLOG(2) << "i: " << i << ", sparse_idx: " << sparse_idx
<< ", dense_idx: " << dense_idx;
VLOG(2) << "i: " << i << ", dtype: " << in->at(i).dtype;
if (in->at(i).dtype != paddle::PaddleDType::INT64) { if (in->at(i).dtype != paddle::PaddleDType::INT64) {
dense_out[dense_idx] = in->at(i); dense_out[dense_idx] = in->at(i);
++dense_idx; ++dense_idx;
continue; continue;
} }
VLOG(2) << "in->size: " << in->size() << ", ";
VLOG(2) << "lod.size: " << in->at(i).lod.size();
sparse_out[sparse_idx].lod.resize(in->at(i).lod.size()); sparse_out[sparse_idx].lod.resize(in->at(i).lod.size());
for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) { for (size_t x = 0; x < sparse_out[sparse_idx].lod.size(); ++x) {
sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size()); sparse_out[sparse_idx].lod[x].resize(in->at(i).lod[x].size());
...@@ -163,26 +214,39 @@ int GeneralDistKVInferOp::inference() { ...@@ -163,26 +214,39 @@ int GeneralDistKVInferOp::inference() {
sparse_out[sparse_idx].lod[x].begin()); sparse_out[sparse_idx].lod[x].begin());
} }
sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32; sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32;
sparse_out[sparse_idx].shape.push_back(sparse_out[sparse_idx].lod[0].back()); sparse_out[sparse_idx].shape.push_back(
sparse_out[sparse_idx].lod[0].back());
sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE); sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE);
sparse_out[sparse_idx].name = model_config->_feed_name[i]; sparse_out[sparse_idx].name = model_config->_feed_name[i];
sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() * sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() *
EMBEDDING_SIZE * sizeof(float)); EMBEDDING_SIZE * sizeof(float));
float *dst_ptr = static_cast<float *>(sparse_out[sparse_idx].data.data()); float *dst_ptr = static_cast<float *>(sparse_out[sparse_idx].data.data());
if (!dst_ptr) {
VLOG(2) << "dst_ptr is null. sparse_idx:" << sparse_idx;
continue;
}
for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) {
float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; float *data_ptr = dst_ptr + x * EMBEDDING_SIZE;
uint64_t cur_key = keys[cube_val_idx]; uint64_t cur_key = keys[cube_val_idx];
rec::mcube::CubeValue* cur_val = key_map[cur_key]; VLOG(2) << "(logid=" << log_id << ") x: " << x
<< ", sparse_idx: " << sparse_idx << " cur_key: " << cur_key
<< ", cube_val_idx:" << cube_val_idx;
rec::mcube::CubeValue *cur_val = key_map[cur_key];
if (cur_val->buff.size() == 0) { if (cur_val->buff.size() == 0) {
memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE); memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE);
VLOG(3) << "(logid=" << log_id << ") cube key not found: " << keys[cube_val_idx]; VLOG(3) << "(logid=" << log_id
++cube_key_miss; << ") cube key not found: " << keys[cube_val_idx];
++cube_val_idx; ++cube_key_miss;
continue; ++cube_val_idx;
continue;
} }
VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size(); VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx]
<< " , cube value len:" << cur_val->buff.size();
memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size()); memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size());
//VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", " <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", " <<data_ptr[7] << ", " <<data_ptr[8]; // VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " <<
// data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", "
// <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", "
// <<data_ptr[7] << ", " <<data_ptr[8];
++cube_key_found; ++cube_key_found;
++cube_val_idx; ++cube_val_idx;
} }
...@@ -191,10 +255,11 @@ int GeneralDistKVInferOp::inference() { ...@@ -191,10 +255,11 @@ int GeneralDistKVInferOp::inference() {
bool cube_fail = (cube_key_found == 0); bool cube_fail = (cube_key_found == 0);
if (cube_fail) { if (cube_fail) {
LOG(WARNING) << "(logid=" << log_id << ") cube seek fail"; LOG(WARNING) << "(logid=" << log_id << ") cube seek fail";
//CopyBlobInfo(input_blob, output_blob); // CopyBlobInfo(input_blob, output_blob);
//return 0; // return 0;
} }
VLOG(2) << "(logid=" << log_id << ") cube key found: " << cube_key_found << " , cube key miss: " << cube_key_miss; VLOG(2) << "(logid=" << log_id << ") cube key found: " << cube_key_found
<< " , cube key miss: " << cube_key_miss;
VLOG(2) << "(logid=" << log_id << ") sparse tensor load success."; VLOG(2) << "(logid=" << log_id << ") sparse tensor load success.";
timeline.Pause(); timeline.Pause();
VLOG(2) << "dist kv, cube and datacopy time: " << timeline.ElapsedUS(); VLOG(2) << "dist kv, cube and datacopy time: " << timeline.ElapsedUS();
...@@ -209,21 +274,21 @@ int GeneralDistKVInferOp::inference() { ...@@ -209,21 +274,21 @@ int GeneralDistKVInferOp::inference() {
// call paddle inference here // call paddle inference here
if (InferManager::instance().infer( if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) { engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name(); LOG(ERROR) << "(logid=" << log_id
<< ") Failed do infer in fluid model: " << engine_name();
return -1; return -1;
} }
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
if (cube_fail) { if (cube_fail) {
float *out_ptr = static_cast<float*>(out->at(0).data.data()); float *out_ptr = static_cast<float *>(out->at(0).data.data());
out_ptr[0] = 0.0; out_ptr[0] = 0.0;
} }
timeline.Pause(); timeline.Pause();
VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS(); VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS();
CopyBlobInfo(input_blob, output_blob); CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, start); AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end); AddBlobInfo(output_blob, end);
return 0; return 0;
} }
DEFINE_OP(GeneralDistKVInferOp); DEFINE_OP(GeneralDistKVInferOp);
......
FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp)
LIST(APPEND pdserving_srcs ${framework_srcs}) LIST(APPEND pdserving_srcs ${framework_srcs})
LIST(APPEND pclient_srcs ${framework_srcs}) LIST(APPEND pclient_srcs ${framework_srcs})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include "core/predictor/framework/cache.h"
#include <dirent.h>
#include <sys/stat.h>
#include <fstream>
#include <string>
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int CubeCache::clear() {
for (auto it = _map_cache.begin(); it != _map_cache.end(); ++it) {
if (it->second) {
delete (it->second);
it->second = nullptr;
}
_map_cache.clear();
}
return 0;
}
rec::mcube::CubeValue* CubeCache::get_data(uint64_t key) {
auto it = _map_cache.find(key);
if (it != _map_cache.end()) {
return it->second;
}
return nullptr;
}
int CubeCache::reload_data(const std::string& cache_path) {
DIR* dp = nullptr;
struct dirent* dirp = nullptr;
struct stat st;
// clear cache data
clear();
// loading data from cache files
if (stat(cache_path.c_str(), &st) < 0 || !S_ISDIR(st.st_mode)) {
LOG(ERROR) << "invalid cache path " << cache_path;
return -1;
}
if ((dp = opendir(cache_path.c_str())) == nullptr) {
LOG(ERROR) << "opendir " << cache_path << " fail.";
return -1;
}
while ((dirp = readdir(dp)) != nullptr) {
// filtering by file type.
if (dirp->d_type != DT_REG) {
continue;
}
// Filter upper-level directories and hidden files
if ((!strncmp(dirp->d_name, ".", 1)) || (!strncmp(dirp->d_name, "..", 2))) {
continue;
}
// Match the file whose name prefix is ​​'part-'
if (std::string(dirp->d_name).find("part-") != std::string::npos) {
SequenceFileRecordReader reader(dirp->d_name);
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << dirp->d_name;
continue;
}
if (reader.read_header() != 0) {
LOG(ERROR) << "read header error! " << dirp->d_name;
reader.close();
continue;
}
Record record(reader.get_header());
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t*>(const_cast<char*>(record.key.data()));
auto it_find = _map_cache.find(key);
if (it_find != _map_cache.end()) {
// load dumplicate key
LOG(WARNING) << "Load dumplicate key:" << key
<< " from file:" << dirp->d_name;
continue;
}
rec::mcube::CubeValue* new_value = new rec::mcube::CubeValue();
new_value->error = 0;
new_value->buff = record.value;
_map_cache.insert(std::make_pair(key, new_value));
}
LOG(WARNING) << "Load cube cache file " << dirp->d_name << " done.";
}
LOG(WARNING) << "Load all cube cache files done";
}
return 0;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <sys/types.h>
#include <numeric>
#include <string>
#include <unordered_map>
#include "core/cube/cube-api/include/cube_api.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
// Large models that use sparse parameters may use cube cache.
// When the cube cache exists, the model is required to be
// consistent with the version of the cube cache. Therefore,
// when the model is updated, the model and the cube cache are
// required to be reloaded at the same time.
// Load all cached data at once without updating, it's lock free
// switching two cube cache.
class CubeCache {
public:
CubeCache() {}
~CubeCache() { clear(); }
// clear cache data.
int clear();
// get cache data by key
rec::mcube::CubeValue* get_data(uint64_t key);
// reload all cache files from cache_path
int reload_data(const std::string& cache_path);
private:
// switching free lock, key type is uint64_t, value type is CubeValue*
std::unordered_map<uint64_t, rec::mcube::CubeValue*> _map_cache;
};
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
...@@ -363,6 +363,15 @@ T* VersionedInferEngine::get_core(uint64_t version) { ...@@ -363,6 +363,15 @@ T* VersionedInferEngine::get_core(uint64_t version) {
return NULL; return NULL;
} }
CubeCache* VersionedInferEngine::get_cube_cache() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return nullptr;
}
return engine->get_cube_cache();
}
int VersionedInferEngine::proc_initialize_impl( int VersionedInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool) { const configure::EngineDesc& conf, bool) {
return -1; return -1;
...@@ -502,6 +511,15 @@ T* InferManager::get_core(const char* model_name) { ...@@ -502,6 +511,15 @@ T* InferManager::get_core(const char* model_name) {
return NULL; return NULL;
} }
CubeCache* InferManager::get_cube_cache(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return nullptr;
}
return it->second->get_cube_cache();
}
// Versioned inference interface // Versioned inference interface
int InferManager::infer(const char* model_name, int InferManager::infer(const char* model_name,
const void* in, const void* in,
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <vector> #include <vector>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h" #include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/cache.h"
#include "core/predictor/framework/factory.h" #include "core/predictor/framework/factory.h"
#include "core/predictor/framework/infer_data.h" #include "core/predictor/framework/infer_data.h"
#include "core/predictor/framework/memory.h" #include "core/predictor/framework/memory.h"
...@@ -35,6 +36,7 @@ namespace predictor { ...@@ -35,6 +36,7 @@ namespace predictor {
using configure::ModelToolkitConf; using configure::ModelToolkitConf;
// Auto mutex lock
class AutoLock { class AutoLock {
public: public:
explicit AutoLock(pthread_mutex_t& mutex) : _mut(mutex) { explicit AutoLock(pthread_mutex_t& mutex) : _mut(mutex) {
...@@ -46,6 +48,7 @@ class AutoLock { ...@@ -46,6 +48,7 @@ class AutoLock {
pthread_mutex_t& _mut; pthread_mutex_t& _mut;
}; };
// Gloabl singleton mutex lock
class GlobalCreateMutex { class GlobalCreateMutex {
public: public:
pthread_mutex_t& mutex() { return _mut; } pthread_mutex_t& mutex() { return _mut; }
...@@ -60,6 +63,7 @@ class GlobalCreateMutex { ...@@ -60,6 +63,7 @@ class GlobalCreateMutex {
pthread_mutex_t _mut; pthread_mutex_t _mut;
}; };
// InferEngine
class InferEngine { class InferEngine {
public: public:
virtual ~InferEngine() {} virtual ~InferEngine() {}
...@@ -90,11 +94,13 @@ class InferEngine { ...@@ -90,11 +94,13 @@ class InferEngine {
void* out, void* out,
uint32_t batch_size = -1) = 0; uint32_t batch_size = -1) = 0;
virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT
virtual CubeCache* get_cube_cache() = 0;
protected: protected:
uint32_t _model_index; uint32_t _model_index;
// end: framework inner call // end: framework inner call
}; };
typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT; typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT;
class ReloadableInferEngine : public InferEngine { class ReloadableInferEngine : public InferEngine {
public: public:
...@@ -169,12 +175,12 @@ class ReloadableInferEngine : public InferEngine { ...@@ -169,12 +175,12 @@ class ReloadableInferEngine : public InferEngine {
uint64_t _version; uint64_t _version;
}; };
// Lock free switching two models // Lock free switching two models and cube caches
template <typename EngineCore> template <typename EngineCore>
struct ModelData { struct ModelData {
ModelData() : current_idx(1) { ModelData() : current_idx(1) {
cores[0] = NULL; cores[0] = nullptr;
cores[1] = NULL; cores[1] = nullptr;
} }
~ModelData() { ~ModelData() {
...@@ -182,9 +188,12 @@ struct ModelData { ...@@ -182,9 +188,12 @@ struct ModelData {
delete cores[1]; delete cores[1];
} }
void* get() { return cores[current_idx]->get(); } void* get_core() { return cores[current_idx]->get(); }
CubeCache* get_cache() { return &caches[current_idx]; }
EngineCore* cores[2]; EngineCore* cores[2];
CubeCache caches[2];
uint32_t current_idx; uint32_t current_idx;
}; };
...@@ -196,7 +205,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { ...@@ -196,7 +205,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
int proc_initialize(const configure::EngineDesc& conf, bool version) { int proc_initialize(const configure::EngineDesc& conf, bool version) {
THREAD_KEY_CREATE(&_skey, NULL); THREAD_KEY_CREATE(&_skey, NULL);
THREAD_MUTEX_INIT(&_mutex, NULL); THREAD_MUTEX_INIT(&_mutex, NULL);
gpu_index = 0; _gpu_index = 0;
return ReloadableInferEngine::proc_initialize(conf, version); return ReloadableInferEngine::proc_initialize(conf, version);
} }
...@@ -209,7 +218,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { ...@@ -209,7 +218,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
if (_reload_vec.empty()) { if (_reload_vec.empty()) {
return 0; return 0;
} }
gpu_index = 0; _gpu_index = 0;
for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) { for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) {
if (load_data(_reload_vec[ti], conf) != 0) { if (load_data(_reload_vec[ti], conf) != 0) {
LOG(ERROR) << "Failed reload engine model: " << ti; LOG(ERROR) << "Failed reload engine model: " << ti;
...@@ -224,26 +233,41 @@ class DBReloadableInferEngine : public ReloadableInferEngine { ...@@ -224,26 +233,41 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
virtual int load_data(ModelData<EngineCore>* md, virtual int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) { const configure::EngineDesc& conf) {
uint32_t next_idx = (md->current_idx + 1) % 2; uint32_t next_idx = (md->current_idx + 1) % 2;
// reload engine core
if (md->cores[next_idx]) { if (md->cores[next_idx]) {
delete md->cores[next_idx]; delete md->cores[next_idx];
} }
md->cores[next_idx] = new (std::nothrow) EngineCore; md->cores[next_idx] = new (std::nothrow) EngineCore;
// params.dump();
size_t gpu_ids_num = conf.gpu_ids_size(); size_t gpu_ids_num = conf.gpu_ids_size();
im::bsf::AutoMutex lock(_mutex); im::bsf::AutoMutex lock(_mutex);
int gpu_id = -1; int gpu_id = -1;
if (gpu_ids_num > 0) { if (gpu_ids_num > 0) {
gpu_id = conf.gpu_ids(gpu_index % gpu_ids_num); gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num);
} }
if (!md->cores[next_idx] || if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) { md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1; return -1;
} }
gpu_index++; _gpu_index++;
LOG(WARNING) << "Reload EngineCore[" << next_idx << "] finish.";
// reload cube cache
std::string model_path = conf.model_dir();
if (access(model_path.c_str(), F_OK) == 0) {
std::string cube_cache_path = model_path + "cube_cache";
int reload_cache_ret = md->caches[next_idx].reload_data(cube_cache_path);
LOG(WARNING) << "Reload cube cache[" << next_idx << "] finish.";
} else {
LOG(ERROR) << "model_path " << model_path
<< " is not exits. Ignore cube cache!";
}
md->current_idx = next_idx; md->current_idx = next_idx;
LOG(WARNING)
<< "Reload model and cube cache done. switching to current_idx["
<< next_idx << "]";
return 0; return 0;
} }
...@@ -309,11 +333,25 @@ class DBReloadableInferEngine : public ReloadableInferEngine { ...@@ -309,11 +333,25 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
return md->cores[md->current_idx]; return md->cores[md->current_idx];
} }
CubeCache* get_cube_cache() {
ModelData<EngineCore>* md =
(ModelData<EngineCore>*)THREAD_GETSPECIFIC(_skey);
if (!md) {
LOG(ERROR) << "Failed get thread specific data";
return NULL;
}
return md->get_cache();
}
protected: protected:
THREAD_KEY_T _skey; THREAD_KEY_T _skey;
THREAD_MUTEX_T _mutex; THREAD_MUTEX_T _mutex;
// vector of all model engines
std::vector<ModelData<EngineCore>*> _reload_vec; std::vector<ModelData<EngineCore>*> _reload_vec;
int gpu_index = 0;
// gpu card id
int _gpu_index = 0;
}; };
// 多个EngineCore共用同一份模型数据 // 多个EngineCore共用同一份模型数据
...@@ -347,41 +385,42 @@ class CloneDBReloadableInferEngine ...@@ -347,41 +385,42 @@ class CloneDBReloadableInferEngine
im::bsf::AutoMutex lock(DBReloadableInferEngine<EngineCore>::_mutex); im::bsf::AutoMutex lock(DBReloadableInferEngine<EngineCore>::_mutex);
int gpu_id = -1; int gpu_id = -1;
if (gpu_ids_num > 0) { if (gpu_ids_num > 0) {
gpu_id = conf.gpu_ids(DBReloadableInferEngine<EngineCore>::gpu_index % gpu_id = conf.gpu_ids(DBReloadableInferEngine<EngineCore>::_gpu_index %
gpu_ids_num); gpu_ids_num);
} else { } else {
gpu_ids_num = 1; gpu_ids_num = 1;
} }
// gpu_index will be set to be 0, when load() or proc_initial() is called. // _gpu_index will be set to be 0, when load() or proc_initial() is called.
// gpu_index < gpu_ids_num, means there are predictors still not create // _gpu_index < gpu_ids_num, means there are predictors still not create
// on some GPU card. // on some GPU card.
// so we need to create the predictor. // so we need to create the predictor.
// gpu_index >= gpu_ids_num, means each GPU card has already create one. // _gpu_index >= gpu_ids_num, means each GPU card has already create one.
// so we need to clone the predictor. // so we need to clone the predictor.
if (DBReloadableInferEngine<EngineCore>::gpu_index < gpu_ids_num) { if (DBReloadableInferEngine<EngineCore>::_gpu_index < gpu_ids_num) {
if (!md->cores[next_idx] || if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) { md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1; return -1;
} }
DBReloadableInferEngine<EngineCore>::gpu_index++; DBReloadableInferEngine<EngineCore>::_gpu_index++;
md->current_idx = next_idx; md->current_idx = next_idx;
if (_cloneTemplate.size() < if (_cloneTemplate.size() <
DBReloadableInferEngine<EngineCore>::gpu_index) { DBReloadableInferEngine<EngineCore>::_gpu_index) {
_cloneTemplate.push_back(md); _cloneTemplate.push_back(md);
} else { } else {
_cloneTemplate[DBReloadableInferEngine<EngineCore>::gpu_index - 1] = md; _cloneTemplate[DBReloadableInferEngine<EngineCore>::_gpu_index - 1] =
md;
} }
} else { } else {
int template_index = DBReloadableInferEngine<EngineCore>::gpu_index % int template_index = DBReloadableInferEngine<EngineCore>::_gpu_index %
_cloneTemplate.size(); _cloneTemplate.size();
if (!md->cores[next_idx] || if (!md->cores[next_idx] ||
md->cores[next_idx]->clone(_cloneTemplate[template_index]->get()) != md->cores[next_idx]->clone(
0) { _cloneTemplate[template_index]->get_core()) != 0) {
LOG(ERROR) << "Failed clone model from core"; LOG(ERROR) << "Failed clone model from core";
return -1; return -1;
} }
DBReloadableInferEngine<EngineCore>::gpu_index++; DBReloadableInferEngine<EngineCore>::_gpu_index++;
md->current_idx = next_idx; md->current_idx = next_idx;
LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx
<< "]."; << "].";
...@@ -532,6 +571,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> { ...@@ -532,6 +571,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
int task_infer_impl(const void* in, void* out) { // NOLINT int task_infer_impl(const void* in, void* out) { // NOLINT
return infer_impl(in, out); return infer_impl(in, out);
} }
CubeCache* get_cube_cache() {
return DBReloadableInferEngine<EngineCore>::get_cube_cache();
}
}; };
typedef FactoryPool<InferEngine> StaticInferFactory; typedef FactoryPool<InferEngine> StaticInferFactory;
...@@ -565,6 +608,8 @@ class VersionedInferEngine : public InferEngine { ...@@ -565,6 +608,8 @@ class VersionedInferEngine : public InferEngine {
template <typename T> template <typename T>
T* get_core(); T* get_core();
CubeCache* get_cube_cache();
// versioned inference interface // versioned inference interface
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version); int infer(const void* in, void* out, uint32_t batch_size, uint64_t version);
...@@ -616,9 +661,13 @@ class InferManager { ...@@ -616,9 +661,13 @@ class InferManager {
void* out, void* out,
uint32_t batch_size = -1); uint32_t batch_size = -1);
// get engine core
template <typename T> template <typename T>
T* get_core(const char* model_name); T* get_core(const char* model_name);
// get cube cache
CubeCache* get_cube_cache(const char* model_name);
// Versioned inference interface // Versioned inference interface
int infer(const char* model_name, int infer(const char* model_name,
const void* in, const void* in,
...@@ -626,9 +675,11 @@ class InferManager { ...@@ -626,9 +675,11 @@ class InferManager {
uint32_t batch_size, uint32_t batch_size,
uint64_t version); uint64_t version);
// Versioned get engine core
template <typename T> template <typename T>
T* get_core(const char* model_name, uint64_t version); T* get_core(const char* model_name, uint64_t version);
// query model version
int query_version(const std::string& model, uint64_t& version); int query_version(const std::string& model, uint64_t& version);
private: private:
......
...@@ -165,18 +165,18 @@ int Resource::initialize(const std::string& path, const std::string& file) { ...@@ -165,18 +165,18 @@ int Resource::initialize(const std::string& path, const std::string& file) {
rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() + std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() +
"/" + resource_conf.cube_config_file(); "/" + resource_conf.cube_config_file();
this->cube_config_fullpath = cube_config_fullpath; this->_cube_config_fullpath = cube_config_fullpath;
this->cube_quant_bits = resource_conf.has_cube_quant_bits() this->_cube_quant_bits = resource_conf.has_cube_quant_bits()
? resource_conf.cube_quant_bits() ? resource_conf.cube_quant_bits()
: 0; : 0;
if (this->cube_quant_bits != 0 && this->cube_quant_bits != 8) { if (this->_cube_quant_bits != 0 && this->_cube_quant_bits != 8) {
LOG(ERROR) << "Cube quant bits illegal! should be 0 or 8."; LOG(ERROR) << "Cube quant bits illegal! should be 0 or 8.";
return -1; return -1;
} }
if (this->cube_quant_bits == 0) { if (this->_cube_quant_bits == 0) {
LOG(INFO) << "cube quant mode OFF"; LOG(INFO) << "cube quant mode OFF";
} else { } else {
LOG(INFO) << "cube quant mode ON, quant bits: " << this->cube_quant_bits; LOG(INFO) << "cube quant mode ON, quant bits: " << this->_cube_quant_bits;
} }
} }
...@@ -187,10 +187,10 @@ int Resource::initialize(const std::string& path, const std::string& file) { ...@@ -187,10 +187,10 @@ int Resource::initialize(const std::string& path, const std::string& file) {
// model config // model config
int Resource::general_model_initialize(const std::string& path, int Resource::general_model_initialize(const std::string& path,
const std::string& file) { const std::string& file) {
if (this->cube_config_fullpath.size() != 0) { if (this->_cube_config_fullpath.size() != 0) {
LOG(INFO) << "init cube by config file : " << this->cube_config_fullpath; LOG(INFO) << "init cube by config file : " << this->_cube_config_fullpath;
rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
int ret = cube->init(this->cube_config_fullpath.c_str()); int ret = cube->init(this->_cube_config_fullpath.c_str());
if (ret != 0) { if (ret != 0) {
LOG(ERROR) << "cube init error"; LOG(ERROR) << "cube init error";
return -1; return -1;
...@@ -315,7 +315,7 @@ int Resource::thread_clear() { ...@@ -315,7 +315,7 @@ int Resource::thread_clear() {
} }
return 0; return 0;
} }
size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; } size_t Resource::get_cube_quant_bits() { return this->_cube_quant_bits; }
int Resource::reload() { int Resource::reload() {
if (FLAGS_enable_model_toolkit && InferManager::instance().reload() != 0) { if (FLAGS_enable_model_toolkit && InferManager::instance().reload() != 0) {
......
...@@ -16,8 +16,10 @@ ...@@ -16,8 +16,10 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "core/cube/cube-api/include/cube_api.h" #include "core/cube/cube-api/include/cube_api.h"
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/infer.h" #include "core/predictor/framework/infer.h"
...@@ -27,6 +29,8 @@ namespace baidu { ...@@ -27,6 +29,8 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
// Paddle general model configuration, read the model configuration information
// from the general_model_config.proto file
class PaddleGeneralModelConfig { class PaddleGeneralModelConfig {
public: public:
PaddleGeneralModelConfig() {} PaddleGeneralModelConfig() {}
...@@ -34,23 +38,47 @@ class PaddleGeneralModelConfig { ...@@ -34,23 +38,47 @@ class PaddleGeneralModelConfig {
~PaddleGeneralModelConfig() {} ~PaddleGeneralModelConfig() {}
public: public:
// feed/fetch name and alias_name
std::vector<std::string> _feed_name; std::vector<std::string> _feed_name;
std::vector<std::string> _feed_alias_name; std::vector<std::string> _feed_alias_name;
std::vector<int> _feed_type; // 0 int64, 1 float
std::vector<bool> _is_lod_feed; // true lod tensor
std::vector<bool> _is_lod_fetch; // whether a fetch var is lod_tensor
std::vector<int> _capacity; // capacity for each tensor
/*
feed_shape_ for feeded variable
feed_shape_[i][j] represents the jth dim for ith input Tensor
if is_lod_feed_[i] == False, feed_shape_[i][0] = -1
*/
std::vector<std::vector<int>> _feed_shape;
std::vector<std::string> _fetch_name; std::vector<std::string> _fetch_name;
std::vector<std::string> _fetch_alias_name; std::vector<std::string> _fetch_alias_name;
// Be consistent with model saving interface var type conversion
// (python/paddle serving client/io/__init__)
// int64 => 0;
// float32 => 1;
// int32 => 2;
// float64 => 3;
// int16 => 4;
// float16 => 5;
// bfloat16 => 6;
// uint8 => 7;
// int8 => 8;
// bool => 9;
// complex64 => 10,
// complex128 => 11;
std::vector<int> _feed_type;
// whether a feed or fetch var is lod_tensor.
std::vector<bool> _is_lod_feed;
std::vector<bool> _is_lod_fetch;
// capacity for each tensor
std::vector<int> _capacity;
// _feed_shape and _fetch_shape are used to represent the dimensional
// information of tensor.
// for examples, feed_shape_[i][j] represents the j(th) dim for i(th) input
// tensor.
// if is_lod_feed_[i] == False, feed_shape_[i][0] = -1
std::vector<std::vector<int>> _feed_shape;
std::vector<std::vector<int>> _fetch_shape; std::vector<std::vector<int>> _fetch_shape;
// fetch name -> index of fetch_name vector.
std::map<std::string, int> _fetch_name_to_index; std::map<std::string, int> _fetch_name_to_index;
// fetch alias name -> index of fetch_alias_name vector.
std::map<std::string, int> _fetch_alias_name_to_index; std::map<std::string, int> _fetch_alias_name_to_index;
}; };
...@@ -73,33 +101,50 @@ class Resource { ...@@ -73,33 +101,50 @@ class Resource {
return ins; return ins;
} }
// initialize resource
int initialize(const std::string& path, const std::string& file); int initialize(const std::string& path, const std::string& file);
// loading all models configurations from prototxt
int general_model_initialize(const std::string& path, int general_model_initialize(const std::string& path,
const std::string& file); const std::string& file);
// initialize thread local data
int thread_initialize(); int thread_initialize();
// clear thread local data
int thread_clear(); int thread_clear();
// reload resources
int reload(); int reload();
// finalize
int finalize(); int finalize();
// get all model configs
std::vector<std::shared_ptr<PaddleGeneralModelConfig>> std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
get_general_model_config(); get_general_model_config();
// print all configurations of all models
void print_general_model_config( void print_general_model_config(
const std::shared_ptr<PaddleGeneralModelConfig>& config); const std::shared_ptr<PaddleGeneralModelConfig>& config);
// get cube quantity bit size
size_t get_cube_quant_bits(); size_t get_cube_quant_bits();
private: private:
int thread_finalize() { return 0; } int thread_finalize() { return 0; }
private:
// configuration infermation of all models, loading from prototxt files
std::vector<std::shared_ptr<PaddleGeneralModelConfig>> _configs; std::vector<std::shared_ptr<PaddleGeneralModelConfig>> _configs;
std::string cube_config_fullpath;
int cube_quant_bits; // 0 if no empty
// full path of cube configuration file.
std::string _cube_config_fullpath;
// cube quantify bit size, support 0/8. set 0 if no quant.
size_t _cube_quant_bits;
// bthread local key
THREAD_KEY_T _tls_bspec_key; THREAD_KEY_T _tls_bspec_key;
}; };
......
...@@ -94,7 +94,9 @@ const std::string getFileBySuffix( ...@@ -94,7 +94,9 @@ const std::string getFileBySuffix(
return fileName; return fileName;
} }
// Engine Base // Engine Core is the base class of inference engines, which can be derived from
// paddle Inference Engine, or inference engines of other machine learning
// platforms
class EngineCore { class EngineCore {
public: public:
virtual ~EngineCore() {} virtual ~EngineCore() {}
...@@ -141,6 +143,11 @@ class EngineCore { ...@@ -141,6 +143,11 @@ class EngineCore {
virtual void* get() { return _predictor.get(); } virtual void* get() { return _predictor.get(); }
protected: protected:
// _predictor is a prediction instance of Paddle Inference.
// when inferring on the CPU, _predictor is bound to a model.
// when inferring on the GPU, _predictor is bound to a model and a GPU card.
// Therefore, when using GPU multi-card inference, you need to create multiple
// EngineCore.
std::shared_ptr<Predictor> _predictor; std::shared_ptr<Predictor> _predictor;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册