未验证 提交 0f523516 编写于 作者: T TeslaZhao 提交者: GitHub

Merge branch 'develop' into develop

......@@ -75,11 +75,14 @@ message ResourceConf {
repeated string model_toolkit_file = 2;
repeated string general_model_path = 3;
repeated string general_model_file = 4;
optional string cube_config_path = 5;
optional string cube_config_file = 6;
optional int32 cube_quant_bits = 7; // set 0 if no quant.
optional string auth_product_name = 8;
optional string auth_container_id = 9;
optional string cube_config_path = 10;
optional string cube_config_file = 11;
optional int32 cube_quant_bits = 12;
optional string cube_cache_path = 13;
optional string auth_product_name = 20;
optional string auth_container_id = 21;
};
// DAG node depency info
......
......@@ -20,6 +20,7 @@
#include <unordered_map>
#include <utility>
#include "core/cube/cube-api/include/cube_api.h"
#include "core/predictor/framework/cache.h"
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h"
......@@ -36,10 +37,11 @@ using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
using baidu::paddle_serving::predictor::CubeCache;
// DistKV Infer Op: seek cube and then call paddle inference
// op seq: general_reader-> dist_kv_infer -> general_response
int GeneralDistKVInferOp::inference() {
int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
......@@ -60,8 +62,8 @@ int GeneralDistKVInferOp::inference() {
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!output_blob) {
LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error";
return -1;
LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error";
return -1;
}
output_blob->SetLogId(log_id);
......@@ -76,18 +78,24 @@ int GeneralDistKVInferOp::inference() {
TensorVector *out = &output_blob->tensor_vector;
std::vector<uint64_t> keys;
std::vector<uint64_t> unique_keys;
std::unordered_map<uint64_t, rec::mcube::CubeValue*> key_map;
std::unordered_map<uint64_t, rec::mcube::CubeValue *> key_map;
std::vector<rec::mcube::CubeValue> values;
int sparse_count = 0; // sparse inputs counts, sparse would seek cube
int dense_count = 0; // dense inputs counts, dense would directly call paddle infer
// sparse inputs counts, sparse would seek cube
int sparse_count = 0;
// dense inputs counts, dense would directly call paddle infer
int dense_count = 0;
std::vector<std::pair<int64_t *, size_t>> dataptr_size_pairs;
size_t key_len = 0;
for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
// dense input type is not int64
++dense_count;
continue;
}
// sparse input type is int64
++sparse_count;
size_t elem_num = 1;
for (size_t s = 0; s < in->at(i).shape.size(); ++s) {
elem_num *= in->at(i).shape[s];
......@@ -107,34 +115,80 @@ int GeneralDistKVInferOp::inference() {
key_idx += dataptr_size_pairs[i].second;
}
// filter dumplicate keys
int unique_keys_count = 0;
for (size_t i = 0; i < keys.size(); ++i) {
if (key_map.find(keys[i]) == key_map.end()) {
key_map[keys[i]] = nullptr;
unique_keys[unique_keys_count++] = keys[i];
key_map[keys[i]] = nullptr;
unique_keys[unique_keys_count++] = keys[i];
}
}
unique_keys.resize(unique_keys_count);
VLOG(1) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len << " uniq keys: "<< unique_keys_count;
VLOG(1) << "(logid=" << log_id
<< ") cube number of keys to look up: " << key_len
<< " uniq keys: " << unique_keys_count;
// fitler cache keys
size_t hit_counts = 0;
int64_t seek_cache_start = timeline.TimeStampUS();
CubeCache *p_cube_cache =
InferManager::instance().get_cube_cache(engine_name().c_str());
if (p_cube_cache != nullptr) {
for (size_t i = 0; i < unique_keys_count; ++i) {
rec::mcube::CubeValue *hit_val = p_cube_cache->get_data(unique_keys[i]);
if (hit_val) {
// LOG(WARNING) << "Hit one cache. key:" << unique_keys[i];
key_map[unique_keys[i]] = hit_val;
if (hit_counts % 100 == 0) {
LOG(WARNING) << "hit cache! key:" << unique_keys[i]
<< " value:" << hit_val->buff;
}
unique_keys[i] = 0;
++hit_counts;
}
}
} else {
LOG(WARNING) << "get cube cache fail. model: " << engine_name();
}
// clear unique keys which hit caches
if (hit_counts > 0) {
for (auto it = unique_keys.begin(); it < unique_keys.end();) {
if (*it == 0) {
it = unique_keys.erase(it);
--unique_keys_count;
} else {
++it;
}
}
}
int64_t seek_cache_end = timeline.TimeStampUS();
VLOG(2) << "cache hit " << hit_counts
<< " keys in cube cache, last unique_keys:" << unique_keys.size()
<< " , seek_time:" << seek_cache_end - seek_cache_start;
// seek sparse params
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
std::vector<std::string> table_names = cube->get_table_names();
if (table_names.size() == 0) {
LOG(ERROR) << "cube init error or cube config not given.";
return -1;
}
int64_t seek_start = timeline.TimeStampUS();
int ret = cube->seek(table_names[0], unique_keys, &values);
int64_t seek_end = timeline.TimeStampUS();
VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret << " seek_time: " << seek_end - seek_start;
for (size_t i = 0; i < unique_keys.size(); ++i) {
key_map[unique_keys[i]] = &values[i];
VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret
<< " , unique_key: " << unique_keys.size()
<< " , seek_time: " << seek_end - seek_start;
for (size_t i = 0; i < unique_keys.size(); ++i) {
key_map[unique_keys[i]] = &values[i];
}
if (values.size() != keys.size() || values[0].buff.size() == 0) {
LOG(ERROR) << "cube value return null";
}
//size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
// size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float);
// size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float);
size_t EMBEDDING_SIZE = 9;
TensorVector sparse_out;
sparse_out.resize(sparse_count);
TensorVector dense_out;
......@@ -145,10 +199,10 @@ int GeneralDistKVInferOp::inference() {
std::unordered_map<int, int> in_out_map;
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
std::shared_ptr<PaddleGeneralModelConfig> model_config = resource.get_general_model_config().front();
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config().front();
int cube_key_found = 0;
int cube_key_miss = 0;
int cube_key_miss = 0;
for (size_t i = 0; i < in->size(); ++i) {
if (in->at(i).dtype != paddle::PaddleDType::INT64) {
dense_out[dense_idx] = in->at(i);
......@@ -163,26 +217,35 @@ int GeneralDistKVInferOp::inference() {
sparse_out[sparse_idx].lod[x].begin());
}
sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32;
sparse_out[sparse_idx].shape.push_back(sparse_out[sparse_idx].lod[0].back());
sparse_out[sparse_idx].shape.push_back(
sparse_out[sparse_idx].lod[0].back());
sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE);
sparse_out[sparse_idx].name = model_config->_feed_name[i];
sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() *
EMBEDDING_SIZE * sizeof(float));
float *dst_ptr = static_cast<float *>(sparse_out[sparse_idx].data.data());
if (!dst_ptr) {
VLOG(2) << "dst_ptr is null. sparse_idx:" << sparse_idx;
continue;
}
for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) {
float *data_ptr = dst_ptr + x * EMBEDDING_SIZE;
uint64_t cur_key = keys[cube_val_idx];
rec::mcube::CubeValue* cur_val = key_map[cur_key];
rec::mcube::CubeValue *cur_val = key_map[cur_key];
if (cur_val->buff.size() == 0) {
memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE);
VLOG(3) << "(logid=" << log_id << ") cube key not found: " << keys[cube_val_idx];
++cube_key_miss;
++cube_val_idx;
continue;
memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE);
++cube_key_miss;
++cube_val_idx;
continue;
}
VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size();
memcpy(data_ptr, cur_val->buff.data() + 10, sizeof(float) * EMBEDDING_SIZE);
//VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", " <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", " <<data_ptr[7] << ", " <<data_ptr[8];
// The data generated by pslib has 10 bytes of information to be filtered
// out
memcpy(data_ptr, cur_val->buff.data() + 10, cur_val->buff.size() - 10);
// VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " <<
// data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", "
// <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", "
// <<data_ptr[7] << ", " <<data_ptr[8];
++cube_key_found;
++cube_val_idx;
}
......@@ -191,10 +254,9 @@ int GeneralDistKVInferOp::inference() {
bool cube_fail = (cube_key_found == 0);
if (cube_fail) {
LOG(WARNING) << "(logid=" << log_id << ") cube seek fail";
//CopyBlobInfo(input_blob, output_blob);
//return 0;
}
VLOG(2) << "(logid=" << log_id << ") cube key found: " << cube_key_found << " , cube key miss: " << cube_key_miss;
VLOG(2) << "(logid=" << log_id << ") cube key found: " << cube_key_found
<< " , cube key miss: " << cube_key_miss;
VLOG(2) << "(logid=" << log_id << ") sparse tensor load success.";
timeline.Pause();
VLOG(2) << "dist kv, cube and datacopy time: " << timeline.ElapsedUS();
......@@ -209,21 +271,21 @@ int GeneralDistKVInferOp::inference() {
// call paddle inference here
if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "(logid=" << log_id << ") Failed do infer in fluid model: " << engine_name();
LOG(ERROR) << "(logid=" << log_id
<< ") Failed do infer in fluid model: " << engine_name();
return -1;
}
int64_t end = timeline.TimeStampUS();
if (cube_fail) {
float *out_ptr = static_cast<float*>(out->at(0).data.data());
out_ptr[0] = 0.0;
if (cube_fail) {
float *out_ptr = static_cast<float *>(out->at(0).data.data());
out_ptr[0] = 0.0;
}
timeline.Pause();
VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS();
VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS();
CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end);
return 0;
return 0;
}
DEFINE_OP(GeneralDistKVInferOp);
......
FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp)
LIST(APPEND pdserving_srcs ${framework_srcs})
LIST(APPEND pclient_srcs ${framework_srcs})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include "core/predictor/framework/cache.h"
#include <dirent.h>
#include <sys/stat.h>
#include <fstream>
#include <string>
#include <utility>
#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int CubeCache::clear() {
for (auto it = _map_cache.begin(); it != _map_cache.end(); ++it) {
if (it->second) {
delete (it->second);
it->second = nullptr;
}
}
_map_cache.clear();
return 0;
}
rec::mcube::CubeValue* CubeCache::get_data(uint64_t key) {
auto it = _map_cache.find(key);
if (it != _map_cache.end()) {
return it->second;
}
return nullptr;
}
int CubeCache::reload_data(const std::string& cache_path) {
LOG(INFO) << "cube cache is loading data, path: " << cache_path;
DIR* dp = nullptr;
struct dirent* dirp = nullptr;
struct stat st;
// clear cache data
clear();
// loading data from cache files
if (stat(cache_path.c_str(), &st) < 0 || !S_ISDIR(st.st_mode)) {
LOG(ERROR) << "invalid cache path " << cache_path;
return -1;
}
if ((dp = opendir(cache_path.c_str())) == nullptr) {
LOG(ERROR) << "opendir " << cache_path << " fail.";
return -1;
}
while ((dirp = readdir(dp)) != nullptr) {
// filtering by file type.
if (dirp->d_type != DT_REG) {
continue;
}
// Filter upper-level directories and hidden files
if ((!strncmp(dirp->d_name, ".", 1)) || (!strncmp(dirp->d_name, "..", 2))) {
continue;
}
// Match the file whose name prefix is ​​'part-'
if (std::string(dirp->d_name).find("part-") != std::string::npos) {
SequenceFileRecordReader reader(cache_path + "/" + dirp->d_name);
if (reader.open() != 0) {
LOG(ERROR) << "open file failed! " << dirp->d_name;
continue;
}
if (reader.read_header() != 0) {
LOG(ERROR) << "read header error! " << dirp->d_name;
reader.close();
continue;
}
Record record(reader.get_header());
while (reader.next(&record) == 0) {
uint64_t key =
*reinterpret_cast<uint64_t*>(const_cast<char*>(record.key.data()));
auto it_find = _map_cache.find(key);
if (it_find != _map_cache.end()) {
// load dumplicate key
LOG(WARNING) << "Load dumplicate key:" << key
<< " from file:" << dirp->d_name;
continue;
}
rec::mcube::CubeValue* new_value = new rec::mcube::CubeValue();
new_value->error = 0;
new_value->buff.swap(record.value);
_map_cache.insert(std::make_pair(key, new_value));
}
LOG(WARNING) << "Load cube cache file " << dirp->d_name << " done.";
}
LOG(WARNING) << "Load all cube cache files done";
}
return 0;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <sys/types.h>
#include <numeric>
#include <string>
#include <unordered_map>
#include "core/cube/cube-api/include/cube_api.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
// Large models that use sparse parameters may use cube cache.
// When the cube cache exists, the model is required to be
// consistent with the version of the cube cache. Therefore,
// when the model is updated, the model and the cube cache are
// required to be reloaded at the same time.
// Load all cached data at once without updating, it's lock free
// switching two cube cache.
class CubeCache {
public:
CubeCache() {}
~CubeCache() { clear(); }
// clear cache data.
int clear();
// get cache data by key
rec::mcube::CubeValue* get_data(uint64_t key);
// reload all cache files from cache_path
int reload_data(const std::string& cache_path);
private:
// switching free lock, key type is uint64_t, value type is CubeValue*
std::unordered_map<uint64_t, rec::mcube::CubeValue*> _map_cache;
};
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
......@@ -364,6 +364,15 @@ T* VersionedInferEngine::get_core(uint64_t version) {
return NULL;
}
CubeCache* VersionedInferEngine::get_cube_cache() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return nullptr;
}
return engine->get_cube_cache();
}
int VersionedInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool) {
return -1;
......@@ -506,6 +515,15 @@ T* InferManager::get_core(const char* model_name) {
return NULL;
}
CubeCache* InferManager::get_cube_cache(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return nullptr;
}
return it->second->get_cube_cache();
}
// Versioned inference interface
int InferManager::infer(const char* model_name,
const void* in,
......
......@@ -15,6 +15,7 @@
#pragma once
#include <pthread.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include <functional>
......@@ -25,6 +26,7 @@
#include <vector>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/cache.h"
#include "core/predictor/framework/factory.h"
#include "core/predictor/framework/infer_data.h"
#include "core/predictor/framework/memory.h"
......@@ -35,6 +37,7 @@ namespace predictor {
using configure::ModelToolkitConf;
// Auto mutex lock
class AutoLock {
public:
explicit AutoLock(pthread_mutex_t& mutex) : _mut(mutex) {
......@@ -46,6 +49,7 @@ class AutoLock {
pthread_mutex_t& _mut;
};
// Gloabl singleton mutex lock
class GlobalCreateMutex {
public:
pthread_mutex_t& mutex() { return _mut; }
......@@ -60,6 +64,7 @@ class GlobalCreateMutex {
pthread_mutex_t _mut;
};
// InferEngine
class InferEngine {
public:
virtual ~InferEngine() {}
......@@ -90,11 +95,13 @@ class InferEngine {
void* out,
uint32_t batch_size = -1) = 0;
virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT
virtual CubeCache* get_cube_cache() = 0;
protected:
uint32_t _model_index;
// end: framework inner call
};
typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT;
class ReloadableInferEngine : public InferEngine {
public:
......@@ -171,22 +178,29 @@ class ReloadableInferEngine : public InferEngine {
uint64_t _version;
};
// Lock free switching two models
// Lock free switching two models and cube caches
template <typename EngineCore>
struct ModelData {
ModelData() : current_idx(1) {
cores[0] = NULL;
cores[1] = NULL;
cores[0] = nullptr;
cores[1] = nullptr;
caches[0] = nullptr;
caches[1] = nullptr;
}
~ModelData() {
delete cores[0];
delete cores[1];
delete caches[0];
delete caches[1];
}
void* get() { return cores[current_idx]->get(); }
void* get_core() { return cores[current_idx]->get(); }
CubeCache* get_cache() { return caches[current_idx]; }
EngineCore* cores[2];
CubeCache* caches[2];
uint32_t current_idx;
};
......@@ -198,7 +212,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
int proc_initialize(const configure::EngineDesc& conf, bool version) {
THREAD_KEY_CREATE(&_skey, NULL);
THREAD_MUTEX_INIT(&_mutex, NULL);
gpu_index = 0;
_gpu_index = 0;
return ReloadableInferEngine::proc_initialize(conf, version);
}
......@@ -211,7 +225,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
if (_reload_vec.empty()) {
return 0;
}
gpu_index = 0;
_gpu_index = 0;
for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) {
if (load_data(_reload_vec[ti], conf) != 0) {
LOG(ERROR) << "Failed reload engine model: " << ti;
......@@ -226,26 +240,56 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
virtual int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) {
uint32_t next_idx = (md->current_idx + 1) % 2;
// reload engine core
if (md->cores[next_idx]) {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
// params.dump();
if (nullptr == md->cores[next_idx]) {
LOG(ERROR) << "Allocating memory failed. ";
return -1;
}
size_t gpu_ids_num = conf.gpu_ids_size();
im::bsf::AutoMutex lock(_mutex);
int gpu_id = -1;
if (gpu_ids_num > 0) {
gpu_id = conf.gpu_ids(gpu_index % gpu_ids_num);
gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num);
}
LOG(WARNING) << "Loading EngineCore[" << next_idx << "] ...";
if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1;
}
gpu_index++;
_gpu_index++;
LOG(WARNING) << "Loading EngineCore[" << next_idx << "] done.";
// reload cube cache
if (nullptr == md->caches[next_idx]) {
md->caches[next_idx] = new (std::nothrow) CubeCache;
}
if (nullptr == md->caches[next_idx]) {
LOG(ERROR) << "Allocating memory failed.";
return -1;
}
LOG(WARNING) << "Loading cube cache[" << next_idx << "] ...";
std::string model_path = conf.model_dir();
if (access(model_path.c_str(), F_OK) == 0) {
std::string cube_cache_path = model_path + "cube_cache";
int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path);
LOG(WARNING) << "Loading cube cache[" << next_idx << "] done.";
} else {
LOG(ERROR) << "model_path " << model_path
<< " is not exits. Ignore cube cache!";
}
// switch current_idx
md->current_idx = next_idx;
LOG(WARNING)
<< "Reload model and cube cache done. switching to current_idx["
<< next_idx << "]";
return 0;
}
......@@ -311,11 +355,25 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
return md->cores[md->current_idx];
}
CubeCache* get_cube_cache() {
ModelData<EngineCore>* md =
(ModelData<EngineCore>*)THREAD_GETSPECIFIC(_skey);
if (!md) {
LOG(ERROR) << "Failed get thread specific data";
return NULL;
}
return md->get_cache();
}
protected:
THREAD_KEY_T _skey;
THREAD_MUTEX_T _mutex;
// vector of all model engines
std::vector<ModelData<EngineCore>*> _reload_vec;
int gpu_index = 0;
// gpu card id
int _gpu_index = 0;
};
// 多个EngineCore共用同一份模型数据
......@@ -333,12 +391,20 @@ class CloneDBReloadableInferEngine
virtual int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) {
int tid = syscall(SYS_gettid);
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
}
md->cores[next_idx] = new (std::nothrow) EngineCore;
if (nullptr == md->caches[next_idx]) {
md->caches[next_idx] = new (std::nothrow) CubeCache;
}
if (nullptr == md->cores[next_idx] || nullptr == md->caches[next_idx]) {
LOG(ERROR) << "Allocating memory fail.";
return -1;
}
// params.dump();
// gpu_ids_num > 0 is always true.
// if use CPU, gpu_ids = [-1].
......@@ -349,46 +415,70 @@ class CloneDBReloadableInferEngine
im::bsf::AutoMutex lock(DBReloadableInferEngine<EngineCore>::_mutex);
int gpu_id = -1;
if (gpu_ids_num > 0) {
gpu_id = conf.gpu_ids(DBReloadableInferEngine<EngineCore>::gpu_index %
gpu_id = conf.gpu_ids(DBReloadableInferEngine<EngineCore>::_gpu_index %
gpu_ids_num);
} else {
gpu_ids_num = 1;
}
// gpu_index will be set to be 0, when load() or proc_initial() is called.
// gpu_index < gpu_ids_num, means there are predictors still not create
// _gpu_index will be set to be 0, when load() or proc_initial() is called.
// _gpu_index < gpu_ids_num, means there are predictors still not create
// on some GPU card.
// so we need to create the predictor.
// gpu_index >= gpu_ids_num, means each GPU card has already create one.
// _gpu_index >= gpu_ids_num, means each GPU card has already create one.
// so we need to clone the predictor.
if (DBReloadableInferEngine<EngineCore>::gpu_index < gpu_ids_num) {
if (!md->cores[next_idx] ||
md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(WARNING) << "tid:" << tid << " Loading clone model ...";
if (DBReloadableInferEngine<EngineCore>::_gpu_index < gpu_ids_num) {
// create cores
if (md->cores[next_idx]->create(conf, gpu_id) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1;
}
DBReloadableInferEngine<EngineCore>::gpu_index++;
md->current_idx = next_idx;
// create caches
std::string model_path = conf.model_dir();
if (access(model_path.c_str(), F_OK) == 0) {
std::string cube_cache_path = model_path + "cube_cache";
int reload_cache_ret =
md->caches[next_idx]->reload_data(cube_cache_path);
LOG(WARNING) << "create cube cache[" << next_idx << "] done.";
} else {
LOG(WARNING) << "model_path " << model_path
<< " is not exits. Ignore cube cache!";
}
DBReloadableInferEngine<EngineCore>::_gpu_index++;
// md->current_idx = next_idx;
if (_cloneTemplate.size() <
DBReloadableInferEngine<EngineCore>::gpu_index) {
DBReloadableInferEngine<EngineCore>::_gpu_index) {
_cloneTemplate.push_back(md);
} else {
_cloneTemplate[DBReloadableInferEngine<EngineCore>::gpu_index - 1] = md;
_cloneTemplate[DBReloadableInferEngine<EngineCore>::_gpu_index - 1] =
md;
}
} else {
int template_index = DBReloadableInferEngine<EngineCore>::gpu_index %
int template_index = DBReloadableInferEngine<EngineCore>::_gpu_index %
_cloneTemplate.size();
if (!md->cores[next_idx] ||
md->cores[next_idx]->clone(_cloneTemplate[template_index]->get()) !=
0) {
// clone cores
if (md->cores[next_idx]->clone(
_cloneTemplate[template_index]->get_core()) != 0) {
LOG(ERROR) << "Failed clone model from core";
return -1;
}
DBReloadableInferEngine<EngineCore>::gpu_index++;
md->current_idx = next_idx;
LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx
<< "].";
// clone caches
md->caches[next_idx] = _cloneTemplate[template_index]->get_cache();
LOG(WARNING) << "tid:" << tid << " clone caches done";
DBReloadableInferEngine<EngineCore>::_gpu_index++;
}
// switch current_idx
md->current_idx = next_idx;
LOG(WARNING)
<< "[" << tid
<< "] Reload clone model and cube cache done. switching to current_idx["
<< next_idx << "]";
return 0;
}
......@@ -534,6 +624,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
int task_infer_impl(const void* in, void* out) { // NOLINT
return infer_impl(in, out);
}
CubeCache* get_cube_cache() {
return DBReloadableInferEngine<EngineCore>::get_cube_cache();
}
};
typedef FactoryPool<InferEngine> StaticInferFactory;
......@@ -567,6 +661,8 @@ class VersionedInferEngine : public InferEngine {
template <typename T>
T* get_core();
CubeCache* get_cube_cache();
// versioned inference interface
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version);
......@@ -620,9 +716,13 @@ class InferManager {
void* out,
uint32_t batch_size = -1);
// get engine core
template <typename T>
T* get_core(const char* model_name);
// get cube cache
CubeCache* get_cube_cache(const char* model_name);
// Versioned inference interface
int infer(const char* model_name,
const void* in,
......@@ -630,9 +730,11 @@ class InferManager {
uint32_t batch_size,
uint64_t version);
// Versioned get engine core
template <typename T>
T* get_core(const char* model_name, uint64_t version);
// query model version
int query_version(const std::string& model, uint64_t& version);
private:
......
......@@ -176,18 +176,18 @@ int Resource::initialize(const std::string& path, const std::string& file) {
rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() +
"/" + resource_conf.cube_config_file();
this->cube_config_fullpath = cube_config_fullpath;
this->cube_quant_bits = resource_conf.has_cube_quant_bits()
? resource_conf.cube_quant_bits()
: 0;
if (this->cube_quant_bits != 0 && this->cube_quant_bits != 8) {
this->_cube_config_fullpath = cube_config_fullpath;
this->_cube_quant_bits = resource_conf.has_cube_quant_bits()
? resource_conf.cube_quant_bits()
: 0;
if (this->_cube_quant_bits != 0 && this->_cube_quant_bits != 8) {
LOG(ERROR) << "Cube quant bits illegal! should be 0 or 8.";
return -1;
}
if (this->cube_quant_bits == 0) {
if (this->_cube_quant_bits == 0) {
LOG(INFO) << "cube quant mode OFF";
} else {
LOG(INFO) << "cube quant mode ON, quant bits: " << this->cube_quant_bits;
LOG(INFO) << "cube quant mode ON, quant bits: " << this->_cube_quant_bits;
}
}
......@@ -198,10 +198,10 @@ int Resource::initialize(const std::string& path, const std::string& file) {
// model config
int Resource::general_model_initialize(const std::string& path,
const std::string& file) {
if (this->cube_config_fullpath.size() != 0) {
LOG(INFO) << "init cube by config file : " << this->cube_config_fullpath;
if (this->_cube_config_fullpath.size() != 0) {
LOG(INFO) << "init cube by config file : " << this->_cube_config_fullpath;
rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance();
int ret = cube->init(this->cube_config_fullpath.c_str());
int ret = cube->init(this->_cube_config_fullpath.c_str());
if (ret != 0) {
LOG(ERROR) << "cube init error";
return -1;
......@@ -326,7 +326,7 @@ int Resource::thread_clear() {
}
return 0;
}
size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; }
size_t Resource::get_cube_quant_bits() { return this->_cube_quant_bits; }
int Resource::reload() {
if (FLAGS_enable_model_toolkit && InferManager::instance().reload() != 0) {
......
......@@ -16,8 +16,10 @@
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "core/cube/cube-api/include/cube_api.h"
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/infer.h"
......@@ -27,6 +29,8 @@ namespace baidu {
namespace paddle_serving {
namespace predictor {
// Paddle general model configuration, read the model configuration information
// from the general_model_config.proto file
class PaddleGeneralModelConfig {
public:
PaddleGeneralModelConfig() {}
......@@ -34,23 +38,47 @@ class PaddleGeneralModelConfig {
~PaddleGeneralModelConfig() {}
public:
// feed/fetch name and alias_name
std::vector<std::string> _feed_name;
std::vector<std::string> _feed_alias_name;
std::vector<int> _feed_type; // 0 int64, 1 float
std::vector<bool> _is_lod_feed; // true lod tensor
std::vector<bool> _is_lod_fetch; // whether a fetch var is lod_tensor
std::vector<int> _capacity; // capacity for each tensor
/*
feed_shape_ for feeded variable
feed_shape_[i][j] represents the jth dim for ith input Tensor
if is_lod_feed_[i] == False, feed_shape_[i][0] = -1
*/
std::vector<std::vector<int>> _feed_shape;
std::vector<std::string> _fetch_name;
std::vector<std::string> _fetch_alias_name;
// Be consistent with model saving interface var type conversion
// (python/paddle serving client/io/__init__)
// int64 => 0;
// float32 => 1;
// int32 => 2;
// float64 => 3;
// int16 => 4;
// float16 => 5;
// bfloat16 => 6;
// uint8 => 7;
// int8 => 8;
// bool => 9;
// complex64 => 10,
// complex128 => 11;
std::vector<int> _feed_type;
// whether a feed or fetch var is lod_tensor.
std::vector<bool> _is_lod_feed;
std::vector<bool> _is_lod_fetch;
// capacity for each tensor
std::vector<int> _capacity;
// _feed_shape and _fetch_shape are used to represent the dimensional
// information of tensor.
// for examples, feed_shape_[i][j] represents the j(th) dim for i(th) input
// tensor.
// if is_lod_feed_[i] == False, feed_shape_[i][0] = -1
std::vector<std::vector<int>> _feed_shape;
std::vector<std::vector<int>> _fetch_shape;
// fetch name -> index of fetch_name vector.
std::map<std::string, int> _fetch_name_to_index;
// fetch alias name -> index of fetch_alias_name vector.
std::map<std::string, int> _fetch_alias_name_to_index;
};
......@@ -73,33 +101,50 @@ class Resource {
return ins;
}
// initialize resource
int initialize(const std::string& path, const std::string& file);
// loading all models configurations from prototxt
int general_model_initialize(const std::string& path,
const std::string& file);
// initialize thread local data
int thread_initialize();
// clear thread local data
int thread_clear();
// reload resources
int reload();
// finalize
int finalize();
// get all model configs
std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
get_general_model_config();
// print all configurations of all models
void print_general_model_config(
const std::shared_ptr<PaddleGeneralModelConfig>& config);
// get cube quantity bit size
size_t get_cube_quant_bits();
private:
int thread_finalize() { return 0; }
private:
// configuration infermation of all models, loading from prototxt files
std::vector<std::shared_ptr<PaddleGeneralModelConfig>> _configs;
std::string cube_config_fullpath;
int cube_quant_bits; // 0 if no empty
// full path of cube configuration file.
std::string _cube_config_fullpath;
// cube quantify bit size, support 0/8. set 0 if no quant.
size_t _cube_quant_bits;
// bthread local key
THREAD_KEY_T _tls_bspec_key;
};
......
......@@ -94,7 +94,9 @@ const std::string getFileBySuffix(
return fileName;
}
// Engine Base
// Engine Core is the base class of inference engines, which can be derived from
// paddle Inference Engine, or inference engines of other machine learning
// platforms
class EngineCore {
public:
virtual ~EngineCore() {}
......@@ -141,6 +143,11 @@ class EngineCore {
virtual void* get() { return _predictor.get(); }
protected:
// _predictor is a prediction instance of Paddle Inference.
// when inferring on the CPU, _predictor is bound to a model.
// when inferring on the GPU, _predictor is bound to a model and a GPU card.
// Therefore, when using GPU multi-card inference, you need to create multiple
// EngineCore.
std::shared_ptr<Predictor> _predictor;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册