diff --git a/core/configure/proto/server_configure.proto b/core/configure/proto/server_configure.proto index 26dd947f3039921fd5b95a6957b9abd165a03971..13b9d39553b9219f0ab7f494f58ab0b7cfd3b7e8 100755 --- a/core/configure/proto/server_configure.proto +++ b/core/configure/proto/server_configure.proto @@ -75,11 +75,14 @@ message ResourceConf { repeated string model_toolkit_file = 2; repeated string general_model_path = 3; repeated string general_model_file = 4; - optional string cube_config_path = 5; - optional string cube_config_file = 6; - optional int32 cube_quant_bits = 7; // set 0 if no quant. - optional string auth_product_name = 8; - optional string auth_container_id = 9; + + optional string cube_config_path = 10; + optional string cube_config_file = 11; + optional int32 cube_quant_bits = 12; + optional string cube_cache_path = 13; + + optional string auth_product_name = 20; + optional string auth_container_id = 21; }; // DAG node depency info diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp index a8f438c278a0b85ec69fe4780bd5595edfd1c514..870f045d43ccf38a73d53e048e1ee435950f8c36 100644 --- a/core/general-server/op/general_dist_kv_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -20,6 +20,7 @@ #include #include #include "core/cube/cube-api/include/cube_api.h" +#include "core/predictor/framework/cache.h" #include "core/predictor/framework/infer.h" #include "core/predictor/framework/memory.h" #include "core/predictor/framework/resource.h" @@ -36,10 +37,11 @@ using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; +using baidu::paddle_serving::predictor::CubeCache; // DistKV Infer Op: seek cube and then call paddle inference // op seq: general_reader-> dist_kv_infer -> general_response -int GeneralDistKVInferOp::inference() { +int GeneralDistKVInferOp::inference() { VLOG(2) << "Going to run inference"; const std::vector pre_node_names = pre_names(); if (pre_node_names.size() != 1) { @@ -60,8 +62,8 @@ int GeneralDistKVInferOp::inference() { GeneralBlob *output_blob = mutable_data(); if (!output_blob) { - LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error"; - return -1; + LOG(ERROR) << "(logid=" << log_id << ") output_blob is nullptr,error"; + return -1; } output_blob->SetLogId(log_id); @@ -76,18 +78,24 @@ int GeneralDistKVInferOp::inference() { TensorVector *out = &output_blob->tensor_vector; std::vector keys; std::vector unique_keys; - std::unordered_map key_map; + std::unordered_map key_map; std::vector values; - int sparse_count = 0; // sparse inputs counts, sparse would seek cube - int dense_count = 0; // dense inputs counts, dense would directly call paddle infer + // sparse inputs counts, sparse would seek cube + int sparse_count = 0; + // dense inputs counts, dense would directly call paddle infer + int dense_count = 0; std::vector> dataptr_size_pairs; size_t key_len = 0; + for (size_t i = 0; i < in->size(); ++i) { if (in->at(i).dtype != paddle::PaddleDType::INT64) { + // dense input type is not int64 ++dense_count; continue; } + // sparse input type is int64 ++sparse_count; + size_t elem_num = 1; for (size_t s = 0; s < in->at(i).shape.size(); ++s) { elem_num *= in->at(i).shape[s]; @@ -107,34 +115,80 @@ int GeneralDistKVInferOp::inference() { key_idx += dataptr_size_pairs[i].second; } + // filter dumplicate keys int unique_keys_count = 0; for (size_t i = 0; i < keys.size(); ++i) { if (key_map.find(keys[i]) == key_map.end()) { - key_map[keys[i]] = nullptr; - unique_keys[unique_keys_count++] = keys[i]; + key_map[keys[i]] = nullptr; + unique_keys[unique_keys_count++] = keys[i]; } } unique_keys.resize(unique_keys_count); - VLOG(1) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len << " uniq keys: "<< unique_keys_count; + VLOG(1) << "(logid=" << log_id + << ") cube number of keys to look up: " << key_len + << " uniq keys: " << unique_keys_count; + + // fitler cache keys + size_t hit_counts = 0; + int64_t seek_cache_start = timeline.TimeStampUS(); + CubeCache *p_cube_cache = + InferManager::instance().get_cube_cache(engine_name().c_str()); + if (p_cube_cache != nullptr) { + for (size_t i = 0; i < unique_keys_count; ++i) { + rec::mcube::CubeValue *hit_val = p_cube_cache->get_data(unique_keys[i]); + if (hit_val) { + // LOG(WARNING) << "Hit one cache. key:" << unique_keys[i]; + key_map[unique_keys[i]] = hit_val; + if (hit_counts % 100 == 0) { + LOG(WARNING) << "hit cache! key:" << unique_keys[i] + << " value:" << hit_val->buff; + } + unique_keys[i] = 0; + ++hit_counts; + } + } + } else { + LOG(WARNING) << "get cube cache fail. model: " << engine_name(); + } + // clear unique keys which hit caches + if (hit_counts > 0) { + for (auto it = unique_keys.begin(); it < unique_keys.end();) { + if (*it == 0) { + it = unique_keys.erase(it); + --unique_keys_count; + } else { + ++it; + } + } + } + int64_t seek_cache_end = timeline.TimeStampUS(); + VLOG(2) << "cache hit " << hit_counts + << " keys in cube cache, last unique_keys:" << unique_keys.size() + << " , seek_time:" << seek_cache_end - seek_cache_start; + + // seek sparse params rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); std::vector table_names = cube->get_table_names(); if (table_names.size() == 0) { LOG(ERROR) << "cube init error or cube config not given."; return -1; } - int64_t seek_start = timeline.TimeStampUS(); int ret = cube->seek(table_names[0], unique_keys, &values); int64_t seek_end = timeline.TimeStampUS(); - VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret << " seek_time: " << seek_end - seek_start; - for (size_t i = 0; i < unique_keys.size(); ++i) { - key_map[unique_keys[i]] = &values[i]; + VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret + << " , unique_key: " << unique_keys.size() + << " , seek_time: " << seek_end - seek_start; + + for (size_t i = 0; i < unique_keys.size(); ++i) { + key_map[unique_keys[i]] = &values[i]; } if (values.size() != keys.size() || values[0].buff.size() == 0) { LOG(ERROR) << "cube value return null"; } - //size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); - size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); + // size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); + // size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); + size_t EMBEDDING_SIZE = 9; TensorVector sparse_out; sparse_out.resize(sparse_count); TensorVector dense_out; @@ -145,10 +199,10 @@ int GeneralDistKVInferOp::inference() { std::unordered_map in_out_map; baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource::instance(); - std::shared_ptr model_config = resource.get_general_model_config().front(); + std::shared_ptr model_config = + resource.get_general_model_config().front(); int cube_key_found = 0; - int cube_key_miss = 0; - + int cube_key_miss = 0; for (size_t i = 0; i < in->size(); ++i) { if (in->at(i).dtype != paddle::PaddleDType::INT64) { dense_out[dense_idx] = in->at(i); @@ -163,26 +217,35 @@ int GeneralDistKVInferOp::inference() { sparse_out[sparse_idx].lod[x].begin()); } sparse_out[sparse_idx].dtype = paddle::PaddleDType::FLOAT32; - sparse_out[sparse_idx].shape.push_back(sparse_out[sparse_idx].lod[0].back()); + sparse_out[sparse_idx].shape.push_back( + sparse_out[sparse_idx].lod[0].back()); sparse_out[sparse_idx].shape.push_back(EMBEDDING_SIZE); sparse_out[sparse_idx].name = model_config->_feed_name[i]; sparse_out[sparse_idx].data.Resize(sparse_out[sparse_idx].lod[0].back() * EMBEDDING_SIZE * sizeof(float)); float *dst_ptr = static_cast(sparse_out[sparse_idx].data.data()); + if (!dst_ptr) { + VLOG(2) << "dst_ptr is null. sparse_idx:" << sparse_idx; + continue; + } for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; uint64_t cur_key = keys[cube_val_idx]; - rec::mcube::CubeValue* cur_val = key_map[cur_key]; + rec::mcube::CubeValue *cur_val = key_map[cur_key]; if (cur_val->buff.size() == 0) { - memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE); - VLOG(3) << "(logid=" << log_id << ") cube key not found: " << keys[cube_val_idx]; - ++cube_key_miss; - ++cube_val_idx; - continue; + memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE); + ++cube_key_miss; + ++cube_val_idx; + continue; } - VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size(); - memcpy(data_ptr, cur_val->buff.data() + 10, sizeof(float) * EMBEDDING_SIZE); - //VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <buff.data() + 10, cur_val->buff.size() - 10); + // VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << + // data_ptr[1] << ", " <(out->at(0).data.data()); - out_ptr[0] = 0.0; + if (cube_fail) { + float *out_ptr = static_cast(out->at(0).data.data()); + out_ptr[0] = 0.0; } timeline.Pause(); - VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS(); + VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS(); CopyBlobInfo(input_blob, output_blob); AddBlobInfo(output_blob, start); AddBlobInfo(output_blob, end); - return 0; - + return 0; } DEFINE_OP(GeneralDistKVInferOp); diff --git a/core/predictor/framework/CMakeLists.txt b/core/predictor/framework/CMakeLists.txt index 641ba7efbad9c97497cd2ef9372fa08391f6769c..2b33dfed0ae1130b7044aa835164e591385fd5b5 100644 --- a/core/predictor/framework/CMakeLists.txt +++ b/core/predictor/framework/CMakeLists.txt @@ -1,3 +1,3 @@ -FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp) +FILE(GLOB framework_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp ${CMAKE_CURRENT_LIST_DIR}/../../cube/cube-builder/src/seqfile_reader.cpp) LIST(APPEND pdserving_srcs ${framework_srcs}) LIST(APPEND pclient_srcs ${framework_srcs}) diff --git a/core/predictor/framework/cache.cpp b/core/predictor/framework/cache.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8715b85a66eccb71469bca294de8d8488cb59288 --- /dev/null +++ b/core/predictor/framework/cache.cpp @@ -0,0 +1,115 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include "core/predictor/framework/cache.h" +#include +#include +#include +#include +#include +#include "core/cube/cube-builder/include/cube-builder/seqfile_reader.h" + +namespace baidu { +namespace paddle_serving { +namespace predictor { + +int CubeCache::clear() { + for (auto it = _map_cache.begin(); it != _map_cache.end(); ++it) { + if (it->second) { + delete (it->second); + it->second = nullptr; + } + } + _map_cache.clear(); + + return 0; +} + +rec::mcube::CubeValue* CubeCache::get_data(uint64_t key) { + auto it = _map_cache.find(key); + if (it != _map_cache.end()) { + return it->second; + } + return nullptr; +} + +int CubeCache::reload_data(const std::string& cache_path) { + LOG(INFO) << "cube cache is loading data, path: " << cache_path; + DIR* dp = nullptr; + struct dirent* dirp = nullptr; + struct stat st; + + // clear cache data + clear(); + + // loading data from cache files + if (stat(cache_path.c_str(), &st) < 0 || !S_ISDIR(st.st_mode)) { + LOG(ERROR) << "invalid cache path " << cache_path; + return -1; + } + if ((dp = opendir(cache_path.c_str())) == nullptr) { + LOG(ERROR) << "opendir " << cache_path << " fail."; + return -1; + } + while ((dirp = readdir(dp)) != nullptr) { + // filtering by file type. + if (dirp->d_type != DT_REG) { + continue; + } + // Filter upper-level directories and hidden files + if ((!strncmp(dirp->d_name, ".", 1)) || (!strncmp(dirp->d_name, "..", 2))) { + continue; + } + // Match the file whose name prefix is ​​'part-' + if (std::string(dirp->d_name).find("part-") != std::string::npos) { + SequenceFileRecordReader reader(cache_path + "/" + dirp->d_name); + + if (reader.open() != 0) { + LOG(ERROR) << "open file failed! " << dirp->d_name; + continue; + } + if (reader.read_header() != 0) { + LOG(ERROR) << "read header error! " << dirp->d_name; + reader.close(); + continue; + } + + Record record(reader.get_header()); + while (reader.next(&record) == 0) { + uint64_t key = + *reinterpret_cast(const_cast(record.key.data())); + + auto it_find = _map_cache.find(key); + if (it_find != _map_cache.end()) { + // load dumplicate key + LOG(WARNING) << "Load dumplicate key:" << key + << " from file:" << dirp->d_name; + continue; + } + rec::mcube::CubeValue* new_value = new rec::mcube::CubeValue(); + new_value->error = 0; + new_value->buff.swap(record.value); + _map_cache.insert(std::make_pair(key, new_value)); + } + + LOG(WARNING) << "Load cube cache file " << dirp->d_name << " done."; + } + LOG(WARNING) << "Load all cube cache files done"; + } + return 0; +} + +} // namespace predictor +} // namespace paddle_serving +} // namespace baidu diff --git a/core/predictor/framework/cache.h b/core/predictor/framework/cache.h new file mode 100644 index 0000000000000000000000000000000000000000..1acc46e0539a33df530b77673809385711f6ea63 --- /dev/null +++ b/core/predictor/framework/cache.h @@ -0,0 +1,55 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include "core/cube/cube-api/include/cube_api.h" + +namespace baidu { +namespace paddle_serving { +namespace predictor { + +// Large models that use sparse parameters may use cube cache. +// When the cube cache exists, the model is required to be +// consistent with the version of the cube cache. Therefore, +// when the model is updated, the model and the cube cache are +// required to be reloaded at the same time. +// Load all cached data at once without updating, it's lock free +// switching two cube cache. +class CubeCache { + public: + CubeCache() {} + + ~CubeCache() { clear(); } + + // clear cache data. + int clear(); + + // get cache data by key + rec::mcube::CubeValue* get_data(uint64_t key); + + // reload all cache files from cache_path + int reload_data(const std::string& cache_path); + + private: + // switching free lock, key type is uint64_t, value type is CubeValue* + std::unordered_map _map_cache; +}; + +} // namespace predictor +} // namespace paddle_serving +} // namespace baidu diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp index a5f00833b7fb1d76894209f46bea2e289151ed10..e0c284df5b9427e8e60bc1cfa19941f20cf2be9f 100644 --- a/core/predictor/framework/infer.cpp +++ b/core/predictor/framework/infer.cpp @@ -364,6 +364,15 @@ T* VersionedInferEngine::get_core(uint64_t version) { return NULL; } +CubeCache* VersionedInferEngine::get_cube_cache() { + InferEngine* engine = default_engine(); + if (!engine) { + LOG(WARNING) << "fail to get default engine"; + return nullptr; + } + return engine->get_cube_cache(); +} + int VersionedInferEngine::proc_initialize_impl( const configure::EngineDesc& conf, bool) { return -1; @@ -506,6 +515,15 @@ T* InferManager::get_core(const char* model_name) { return NULL; } +CubeCache* InferManager::get_cube_cache(const char* model_name) { + auto it = _map.find(model_name); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + return nullptr; + } + return it->second->get_cube_cache(); +} + // Versioned inference interface int InferManager::infer(const char* model_name, const void* in, diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index 4639481c7508e7dfb166bc1a61ecceb54fd36727..45014d28d0034ec402bbd9b21eac3e832da7c1f9 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -15,6 +15,7 @@ #pragma once #include #include +#include #include #include #include @@ -25,6 +26,7 @@ #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/bsf.h" +#include "core/predictor/framework/cache.h" #include "core/predictor/framework/factory.h" #include "core/predictor/framework/infer_data.h" #include "core/predictor/framework/memory.h" @@ -35,6 +37,7 @@ namespace predictor { using configure::ModelToolkitConf; +// Auto mutex lock class AutoLock { public: explicit AutoLock(pthread_mutex_t& mutex) : _mut(mutex) { @@ -46,6 +49,7 @@ class AutoLock { pthread_mutex_t& _mut; }; +// Gloabl singleton mutex lock class GlobalCreateMutex { public: pthread_mutex_t& mutex() { return _mut; } @@ -60,6 +64,7 @@ class GlobalCreateMutex { pthread_mutex_t _mut; }; +// InferEngine class InferEngine { public: virtual ~InferEngine() {} @@ -90,11 +95,13 @@ class InferEngine { void* out, uint32_t batch_size = -1) = 0; virtual int task_infer_impl(const void* in, void* out) = 0; // NOLINT + virtual CubeCache* get_cube_cache() = 0; protected: uint32_t _model_index; // end: framework inner call }; + typedef im::bsf::Task TaskT; class ReloadableInferEngine : public InferEngine { public: @@ -171,22 +178,29 @@ class ReloadableInferEngine : public InferEngine { uint64_t _version; }; -// Lock free switching two models +// Lock free switching two models and cube caches template struct ModelData { ModelData() : current_idx(1) { - cores[0] = NULL; - cores[1] = NULL; + cores[0] = nullptr; + cores[1] = nullptr; + caches[0] = nullptr; + caches[1] = nullptr; } ~ModelData() { delete cores[0]; delete cores[1]; + delete caches[0]; + delete caches[1]; } - void* get() { return cores[current_idx]->get(); } + void* get_core() { return cores[current_idx]->get(); } + + CubeCache* get_cache() { return caches[current_idx]; } EngineCore* cores[2]; + CubeCache* caches[2]; uint32_t current_idx; }; @@ -198,7 +212,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { int proc_initialize(const configure::EngineDesc& conf, bool version) { THREAD_KEY_CREATE(&_skey, NULL); THREAD_MUTEX_INIT(&_mutex, NULL); - gpu_index = 0; + _gpu_index = 0; return ReloadableInferEngine::proc_initialize(conf, version); } @@ -211,7 +225,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine { if (_reload_vec.empty()) { return 0; } - gpu_index = 0; + _gpu_index = 0; for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) { if (load_data(_reload_vec[ti], conf) != 0) { LOG(ERROR) << "Failed reload engine model: " << ti; @@ -226,26 +240,56 @@ class DBReloadableInferEngine : public ReloadableInferEngine { virtual int load_data(ModelData* md, const configure::EngineDesc& conf) { uint32_t next_idx = (md->current_idx + 1) % 2; + + // reload engine core if (md->cores[next_idx]) { delete md->cores[next_idx]; } - md->cores[next_idx] = new (std::nothrow) EngineCore; - - // params.dump(); + if (nullptr == md->cores[next_idx]) { + LOG(ERROR) << "Allocating memory failed. "; + return -1; + } size_t gpu_ids_num = conf.gpu_ids_size(); im::bsf::AutoMutex lock(_mutex); int gpu_id = -1; if (gpu_ids_num > 0) { - gpu_id = conf.gpu_ids(gpu_index % gpu_ids_num); + gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num); } + LOG(WARNING) << "Loading EngineCore[" << next_idx << "] ..."; if (!md->cores[next_idx] || md->cores[next_idx]->create(conf, gpu_id) != 0) { LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } - gpu_index++; + _gpu_index++; + LOG(WARNING) << "Loading EngineCore[" << next_idx << "] done."; + + // reload cube cache + if (nullptr == md->caches[next_idx]) { + md->caches[next_idx] = new (std::nothrow) CubeCache; + } + + if (nullptr == md->caches[next_idx]) { + LOG(ERROR) << "Allocating memory failed."; + return -1; + } + LOG(WARNING) << "Loading cube cache[" << next_idx << "] ..."; + std::string model_path = conf.model_dir(); + if (access(model_path.c_str(), F_OK) == 0) { + std::string cube_cache_path = model_path + "cube_cache"; + int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path); + LOG(WARNING) << "Loading cube cache[" << next_idx << "] done."; + } else { + LOG(ERROR) << "model_path " << model_path + << " is not exits. Ignore cube cache!"; + } + + // switch current_idx md->current_idx = next_idx; + LOG(WARNING) + << "Reload model and cube cache done. switching to current_idx[" + << next_idx << "]"; return 0; } @@ -311,11 +355,25 @@ class DBReloadableInferEngine : public ReloadableInferEngine { return md->cores[md->current_idx]; } + CubeCache* get_cube_cache() { + ModelData* md = + (ModelData*)THREAD_GETSPECIFIC(_skey); + if (!md) { + LOG(ERROR) << "Failed get thread specific data"; + return NULL; + } + return md->get_cache(); + } + protected: THREAD_KEY_T _skey; THREAD_MUTEX_T _mutex; + + // vector of all model engines std::vector*> _reload_vec; - int gpu_index = 0; + + // gpu card id + int _gpu_index = 0; }; // 多个EngineCore共用同一份模型数据 @@ -333,12 +391,20 @@ class CloneDBReloadableInferEngine virtual int load_data(ModelData* md, const configure::EngineDesc& conf) { + int tid = syscall(SYS_gettid); uint32_t next_idx = (md->current_idx + 1) % 2; if (md->cores[next_idx]) { delete md->cores[next_idx]; } md->cores[next_idx] = new (std::nothrow) EngineCore; + if (nullptr == md->caches[next_idx]) { + md->caches[next_idx] = new (std::nothrow) CubeCache; + } + if (nullptr == md->cores[next_idx] || nullptr == md->caches[next_idx]) { + LOG(ERROR) << "Allocating memory fail."; + return -1; + } // params.dump(); // gpu_ids_num > 0 is always true. // if use CPU, gpu_ids = [-1]. @@ -349,46 +415,70 @@ class CloneDBReloadableInferEngine im::bsf::AutoMutex lock(DBReloadableInferEngine::_mutex); int gpu_id = -1; if (gpu_ids_num > 0) { - gpu_id = conf.gpu_ids(DBReloadableInferEngine::gpu_index % + gpu_id = conf.gpu_ids(DBReloadableInferEngine::_gpu_index % gpu_ids_num); } else { gpu_ids_num = 1; } - // gpu_index will be set to be 0, when load() or proc_initial() is called. - // gpu_index < gpu_ids_num, means there are predictors still not create + + // _gpu_index will be set to be 0, when load() or proc_initial() is called. + // _gpu_index < gpu_ids_num, means there are predictors still not create // on some GPU card. // so we need to create the predictor. - // gpu_index >= gpu_ids_num, means each GPU card has already create one. + // _gpu_index >= gpu_ids_num, means each GPU card has already create one. // so we need to clone the predictor. - if (DBReloadableInferEngine::gpu_index < gpu_ids_num) { - if (!md->cores[next_idx] || - md->cores[next_idx]->create(conf, gpu_id) != 0) { + LOG(WARNING) << "tid:" << tid << " Loading clone model ..."; + if (DBReloadableInferEngine::_gpu_index < gpu_ids_num) { + // create cores + if (md->cores[next_idx]->create(conf, gpu_id) != 0) { LOG(ERROR) << "Failed create model, path: " << conf.model_dir(); return -1; } - DBReloadableInferEngine::gpu_index++; - md->current_idx = next_idx; + // create caches + std::string model_path = conf.model_dir(); + if (access(model_path.c_str(), F_OK) == 0) { + std::string cube_cache_path = model_path + "cube_cache"; + int reload_cache_ret = + md->caches[next_idx]->reload_data(cube_cache_path); + LOG(WARNING) << "create cube cache[" << next_idx << "] done."; + } else { + LOG(WARNING) << "model_path " << model_path + << " is not exits. Ignore cube cache!"; + } + + DBReloadableInferEngine::_gpu_index++; + // md->current_idx = next_idx; if (_cloneTemplate.size() < - DBReloadableInferEngine::gpu_index) { + DBReloadableInferEngine::_gpu_index) { _cloneTemplate.push_back(md); } else { - _cloneTemplate[DBReloadableInferEngine::gpu_index - 1] = md; + _cloneTemplate[DBReloadableInferEngine::_gpu_index - 1] = + md; } } else { - int template_index = DBReloadableInferEngine::gpu_index % + int template_index = DBReloadableInferEngine::_gpu_index % _cloneTemplate.size(); - if (!md->cores[next_idx] || - md->cores[next_idx]->clone(_cloneTemplate[template_index]->get()) != - 0) { + + // clone cores + if (md->cores[next_idx]->clone( + _cloneTemplate[template_index]->get_core()) != 0) { LOG(ERROR) << "Failed clone model from core"; return -1; } - DBReloadableInferEngine::gpu_index++; - md->current_idx = next_idx; - LOG(WARNING) << "core clone model succ, cur_idx[" << md->current_idx - << "]."; + // clone caches + md->caches[next_idx] = _cloneTemplate[template_index]->get_cache(); + LOG(WARNING) << "tid:" << tid << " clone caches done"; + + DBReloadableInferEngine::_gpu_index++; } + // switch current_idx + md->current_idx = next_idx; + LOG(WARNING) + << "[" << tid + << "] Reload clone model and cube cache done. switching to current_idx[" + << next_idx << "]"; + return 0; } @@ -534,6 +624,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { int task_infer_impl(const void* in, void* out) { // NOLINT return infer_impl(in, out); } + + CubeCache* get_cube_cache() { + return DBReloadableInferEngine::get_cube_cache(); + } }; typedef FactoryPool StaticInferFactory; @@ -567,6 +661,8 @@ class VersionedInferEngine : public InferEngine { template T* get_core(); + CubeCache* get_cube_cache(); + // versioned inference interface int infer(const void* in, void* out, uint32_t batch_size, uint64_t version); @@ -620,9 +716,13 @@ class InferManager { void* out, uint32_t batch_size = -1); + // get engine core template T* get_core(const char* model_name); + // get cube cache + CubeCache* get_cube_cache(const char* model_name); + // Versioned inference interface int infer(const char* model_name, const void* in, @@ -630,9 +730,11 @@ class InferManager { uint32_t batch_size, uint64_t version); + // Versioned get engine core template T* get_core(const char* model_name, uint64_t version); + // query model version int query_version(const std::string& model, uint64_t& version); private: diff --git a/core/predictor/framework/resource.cpp b/core/predictor/framework/resource.cpp index 14ea880c3d3514717ae7c059e68ebda87f285bde..0f5539d18e1942ffde31714333fe0ce89a49ff6f 100755 --- a/core/predictor/framework/resource.cpp +++ b/core/predictor/framework/resource.cpp @@ -176,18 +176,18 @@ int Resource::initialize(const std::string& path, const std::string& file) { rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); std::string cube_config_fullpath = "./" + resource_conf.cube_config_path() + "/" + resource_conf.cube_config_file(); - this->cube_config_fullpath = cube_config_fullpath; - this->cube_quant_bits = resource_conf.has_cube_quant_bits() - ? resource_conf.cube_quant_bits() - : 0; - if (this->cube_quant_bits != 0 && this->cube_quant_bits != 8) { + this->_cube_config_fullpath = cube_config_fullpath; + this->_cube_quant_bits = resource_conf.has_cube_quant_bits() + ? resource_conf.cube_quant_bits() + : 0; + if (this->_cube_quant_bits != 0 && this->_cube_quant_bits != 8) { LOG(ERROR) << "Cube quant bits illegal! should be 0 or 8."; return -1; } - if (this->cube_quant_bits == 0) { + if (this->_cube_quant_bits == 0) { LOG(INFO) << "cube quant mode OFF"; } else { - LOG(INFO) << "cube quant mode ON, quant bits: " << this->cube_quant_bits; + LOG(INFO) << "cube quant mode ON, quant bits: " << this->_cube_quant_bits; } } @@ -198,10 +198,10 @@ int Resource::initialize(const std::string& path, const std::string& file) { // model config int Resource::general_model_initialize(const std::string& path, const std::string& file) { - if (this->cube_config_fullpath.size() != 0) { - LOG(INFO) << "init cube by config file : " << this->cube_config_fullpath; + if (this->_cube_config_fullpath.size() != 0) { + LOG(INFO) << "init cube by config file : " << this->_cube_config_fullpath; rec::mcube::CubeAPI* cube = rec::mcube::CubeAPI::instance(); - int ret = cube->init(this->cube_config_fullpath.c_str()); + int ret = cube->init(this->_cube_config_fullpath.c_str()); if (ret != 0) { LOG(ERROR) << "cube init error"; return -1; @@ -326,7 +326,7 @@ int Resource::thread_clear() { } return 0; } -size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; } +size_t Resource::get_cube_quant_bits() { return this->_cube_quant_bits; } int Resource::reload() { if (FLAGS_enable_model_toolkit && InferManager::instance().reload() != 0) { diff --git a/core/predictor/framework/resource.h b/core/predictor/framework/resource.h index d8a114dab581b71182c1a510db16aa0d2e818b0a..3fd3d5030d72ebbac6ec229a56c645dcbaf31b92 100644 --- a/core/predictor/framework/resource.h +++ b/core/predictor/framework/resource.h @@ -16,8 +16,10 @@ #include #include #include +#include #include #include + #include "core/cube/cube-api/include/cube_api.h" #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/infer.h" @@ -27,6 +29,8 @@ namespace baidu { namespace paddle_serving { namespace predictor { +// Paddle general model configuration, read the model configuration information +// from the general_model_config.proto file class PaddleGeneralModelConfig { public: PaddleGeneralModelConfig() {} @@ -34,23 +38,47 @@ class PaddleGeneralModelConfig { ~PaddleGeneralModelConfig() {} public: + // feed/fetch name and alias_name std::vector _feed_name; std::vector _feed_alias_name; - std::vector _feed_type; // 0 int64, 1 float - std::vector _is_lod_feed; // true lod tensor - std::vector _is_lod_fetch; // whether a fetch var is lod_tensor - std::vector _capacity; // capacity for each tensor - /* - feed_shape_ for feeded variable - feed_shape_[i][j] represents the jth dim for ith input Tensor - if is_lod_feed_[i] == False, feed_shape_[i][0] = -1 - */ - std::vector> _feed_shape; - std::vector _fetch_name; std::vector _fetch_alias_name; + + // Be consistent with model saving interface var type conversion + // (python/paddle serving client/io/__init__) + // int64 => 0; + // float32 => 1; + // int32 => 2; + // float64 => 3; + // int16 => 4; + // float16 => 5; + // bfloat16 => 6; + // uint8 => 7; + // int8 => 8; + // bool => 9; + // complex64 => 10, + // complex128 => 11; + std::vector _feed_type; + + // whether a feed or fetch var is lod_tensor. + std::vector _is_lod_feed; + std::vector _is_lod_fetch; + + // capacity for each tensor + std::vector _capacity; + + // _feed_shape and _fetch_shape are used to represent the dimensional + // information of tensor. + // for examples, feed_shape_[i][j] represents the j(th) dim for i(th) input + // tensor. + // if is_lod_feed_[i] == False, feed_shape_[i][0] = -1 + std::vector> _feed_shape; std::vector> _fetch_shape; + + // fetch name -> index of fetch_name vector. std::map _fetch_name_to_index; + + // fetch alias name -> index of fetch_alias_name vector. std::map _fetch_alias_name_to_index; }; @@ -73,33 +101,50 @@ class Resource { return ins; } + // initialize resource int initialize(const std::string& path, const std::string& file); + // loading all models configurations from prototxt int general_model_initialize(const std::string& path, const std::string& file); + // initialize thread local data int thread_initialize(); + // clear thread local data int thread_clear(); + // reload resources int reload(); + // finalize int finalize(); + // get all model configs std::vector> get_general_model_config(); + // print all configurations of all models void print_general_model_config( const std::shared_ptr& config); + // get cube quantity bit size size_t get_cube_quant_bits(); private: int thread_finalize() { return 0; } + + private: + // configuration infermation of all models, loading from prototxt files std::vector> _configs; - std::string cube_config_fullpath; - int cube_quant_bits; // 0 if no empty + // full path of cube configuration file. + std::string _cube_config_fullpath; + + // cube quantify bit size, support 0/8. set 0 if no quant. + size_t _cube_quant_bits; + + // bthread local key THREAD_KEY_T _tls_bspec_key; }; diff --git a/paddle_inference/paddle/include/paddle_engine.h b/paddle_inference/paddle/include/paddle_engine.h index 7eedc89c0ae045f51944b08a3a806a9439c02e2a..7cc8120f4eb818905c303b22a0b00d6b205bddb4 100644 --- a/paddle_inference/paddle/include/paddle_engine.h +++ b/paddle_inference/paddle/include/paddle_engine.h @@ -94,7 +94,9 @@ const std::string getFileBySuffix( return fileName; } -// Engine Base +// Engine Core is the base class of inference engines, which can be derived from +// paddle Inference Engine, or inference engines of other machine learning +// platforms class EngineCore { public: virtual ~EngineCore() {} @@ -141,6 +143,11 @@ class EngineCore { virtual void* get() { return _predictor.get(); } protected: + // _predictor is a prediction instance of Paddle Inference. + // when inferring on the CPU, _predictor is bound to a model. + // when inferring on the GPU, _predictor is bound to a model and a GPU card. + // Therefore, when using GPU multi-card inference, you need to create multiple + // EngineCore. std::shared_ptr _predictor; };