diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp old mode 100755 new mode 100644 index e98f84d06c7bae773097296e5bbf800929ffbb6e..17a1aaa602422062e4426ea25876e73fcb21202a --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -47,17 +47,14 @@ int conf_check(const Request *req, for (int i = 0; i < var_num; ++i) { const Tensor &tensor = req->insts(0).tensor_array(i); - if (model_config->_feed_type[i] != - tensor.elem_type()) { + if (model_config->_feed_type[i] != tensor.elem_type()) { LOG(ERROR) << "feed type not match."; return -1; } - if (model_config->_feed_shape[i].size() == - tensor.shape_size()) { + if (model_config->_feed_shape[i].size() == tensor.shape_size()) { for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) { tensor.shape(j); - if (model_config->_feed_shape[i][j] != - tensor.shape(j)) { + if (model_config->_feed_shape[i][j] != tensor.shape(j)) { LOG(ERROR) << "feed shape not match."; return -1; } @@ -73,6 +70,11 @@ int conf_check(const Request *req, int GeneralReaderOp::inference() { // read request from client const Request *req = dynamic_cast(get_request_message()); + if (!req) { + LOG(ERROR) << "Failed get request message"; + return -1; + } + uint64_t log_id = req->log_id(); int input_var_num = 0; std::vector elem_type; @@ -80,14 +82,18 @@ int GeneralReaderOp::inference() { std::vector databuf_size; GeneralBlob *res = mutable_data(); - TensorVector *out = &(res->tensor_vector); - - res->SetLogId(log_id); if (!res) { - LOG(ERROR) << "(logid=" << log_id - << ") Failed get op tls reader object output"; + LOG(ERROR) << "(logid=" << log_id << ") Failed get GeneralBlob"; + return -1; } + TensorVector *out = &(res->tensor_vector); + if (!out) { + LOG(ERROR) << "(logid=" << log_id << ") Failed get tensor_vector of res"; + return -1; + } + + res->SetLogId(log_id); Timer timeline; int64_t start = timeline.TimeStampUS(); int var_num = req->insts(0).tensor_array_size(); @@ -99,7 +105,7 @@ int GeneralReaderOp::inference() { baidu::paddle_serving::predictor::Resource::instance(); VLOG(2) << "(logid=" << log_id << ") get resource pointer done."; - //get the first InferOP's model_config as ReaderOp's model_config by default. + // get the first InferOP's model_config as ReaderOp's model_config by default. std::shared_ptr model_config = resource.get_general_model_config().front(); @@ -140,10 +146,11 @@ int GeneralReaderOp::inference() { lod_tensor.dtype = paddle::PaddleDType::INT32; data_len = tensor.int_data_size(); } else if (elem_type[i] == P_STRING) { - //use paddle::PaddleDType::UINT8 as for String. + // use paddle::PaddleDType::UINT8 as for String. elem_size[i] = sizeof(uint8_t); lod_tensor.dtype = paddle::PaddleDType::UINT8; - //this is for vector, cause the databuf_size != vector.size()*sizeof(char); + // this is for vector, cause the databuf_size != + // vector.size()*sizeof(char); for (int idx = 0; idx < tensor.data_size(); idx++) { data_len += tensor.data()[idx].length(); } @@ -157,34 +164,32 @@ int GeneralReaderOp::inference() { for (int k = 0; k < tensor.lod_size(); ++k) { lod_tensor.lod[0].push_back(tensor.lod(k)); } + VLOG(2) << "(logid=" << log_id << ") var[" << i + << "] has lod_tensor and len=" << out->at(i).lod[0].back(); } for (int k = 0; k < tensor.shape_size(); ++k) { int dim = tensor.shape(k); - VLOG(2) << "(logid=" << log_id << ") shape for var[" << i - << "]: " << dim; + VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim; lod_tensor.shape.push_back(dim); } lod_tensor.name = model_config->_feed_name[i]; out->push_back(lod_tensor); - VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i << "]: " << data_len; databuf_size[i] = data_len * elem_size[i]; out->at(i).data.Resize(data_len * elem_size[i]); - VLOG(2) << "(logid=" << log_id << ") var[" << i - << "] is lod_tensor and len=" << out->at(i).lod[0].back(); - + if (elem_type[i] == P_INT64) { int64_t *dst_ptr = static_cast(out->at(i).data.data()); VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i << "] is " << tensor.int64_data(0); if (!dst_ptr) { LOG(ERROR) << "dst_ptr is nullptr"; - return -1; + return -1; } - memcpy(dst_ptr, tensor.int64_data().data(),databuf_size[i]); + memcpy(dst_ptr, tensor.int64_data().data(), databuf_size[i]); /* int elem_num = tensor.int64_data_size(); for (int k = 0; k < elem_num; ++k) { @@ -197,9 +202,9 @@ int GeneralReaderOp::inference() { << "] is " << tensor.float_data(0); if (!dst_ptr) { LOG(ERROR) << "dst_ptr is nullptr"; - return -1; + return -1; } - memcpy(dst_ptr, tensor.float_data().data(),databuf_size[i]); + memcpy(dst_ptr, tensor.float_data().data(), databuf_size[i]); /*int elem_num = tensor.float_data_size(); for (int k = 0; k < elem_num; ++k) { dst_ptr[k] = tensor.float_data(k); @@ -210,9 +215,9 @@ int GeneralReaderOp::inference() { << "] is " << tensor.int_data(0); if (!dst_ptr) { LOG(ERROR) << "dst_ptr is nullptr"; - return -1; + return -1; } - memcpy(dst_ptr, tensor.int_data().data(),databuf_size[i]); + memcpy(dst_ptr, tensor.int_data().data(), databuf_size[i]); /* int elem_num = tensor.int_data_size(); for (int k = 0; k < elem_num; ++k) { @@ -225,7 +230,7 @@ int GeneralReaderOp::inference() { << "] is " << tensor.data(0); if (!dst_ptr) { LOG(ERROR) << "dst_ptr is nullptr"; - return -1; + return -1; } int elem_num = tensor.data_size(); for (int k = 0; k < elem_num; ++k) { diff --git a/core/predictor/framework/infer.cpp b/core/predictor/framework/infer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e11861426fe3c1c1cea39811d66bb4feffdd8b9e --- /dev/null +++ b/core/predictor/framework/infer.cpp @@ -0,0 +1,525 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "core/predictor/framework/infer.h" + +namespace baidu { +namespace paddle_serving { +namespace predictor { + +int ReloadableInferEngine::proc_initialize_impl( + const configure::EngineDesc& conf, bool version) { + _reload_tag_file = conf.reloadable_meta(); + _reload_type = conf.reloadable_type(); + _model_dir = conf.model_dir(); + _infer_thread_num = conf.runtime_thread_num(); + _infer_batch_size = conf.batch_infer_size(); + _infer_batch_align = conf.enable_batch_align(); + + _conf = conf; + + if (!check_need_reload() || load(conf) != 0) { + LOG(ERROR) << "Failed load model_data_path" << _model_dir; + return -1; + } + + if (parse_version_info(conf, version) != 0) { + LOG(ERROR) << "Failed parse version info"; + return -1; + } + + LOG(WARNING) << "Succ load model:" << _model_dir; + return 0; +} + +int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf, + bool version) { + if (proc_initialize_impl(conf, version) != 0) { + LOG(ERROR) << "Failed proc initialize impl"; + return -1; + } + + // init bsf framework + if (_infer_thread_num <= 0) { + return 0; + } + + // init bsf framework + im::bsf::TaskExecutor::instance()->set_thread_init_fn( + boost::bind(&InferEngine::thrd_initialize_impl, this)); + im::bsf::TaskExecutor::instance()->set_thread_reset_fn( + boost::bind(&InferEngine::thrd_clear_impl, this)); + im::bsf::TaskExecutor::instance()->set_thread_callback_fn( + boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); + im::bsf::TaskExecutor::instance()->set_batch_size(_infer_batch_size); + im::bsf::TaskExecutor::instance()->set_batch_align(_infer_batch_align); + if (im::bsf::TaskExecutor::instance()->start(_infer_thread_num) != 0) { + LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; + return -1; + } + + LOG(WARNING) << "Enable batch schedule framework, thread_num:" + << _infer_thread_num << ", batch_size:" << _infer_batch_size + << ", enable_batch_align:" << _infer_batch_align; + return 0; +} + +int ReloadableInferEngine::infer(const void* in, + void* out, + uint32_t batch_size) { + if (_infer_thread_num <= 0) { + return infer_impl(in, out, batch_size); + } + + im::bsf::TaskManager task_manager; + task_manager.schedule(*(reinterpret_cast(in)), + *(reinterpret_cast(out))); + task_manager.wait(); + return 0; +} + +int ReloadableInferEngine::thrd_initialize() { + if (_infer_thread_num > 0) { + return 0; + } + return thrd_initialize_impl(); +} + +int ReloadableInferEngine::thrd_clear() { + if (_infer_thread_num > 0) { + return 0; + } + return thrd_clear_impl(); +} + +int ReloadableInferEngine::proc_finalize() { + if (proc_finalize_impl() != 0) { + LOG(ERROR) << "Failed proc finalize impl"; + return -1; + } + + if (_infer_thread_num > 0) { + im::bsf::TaskExecutor::instance()->stop(); + } + return 0; +} + +int ReloadableInferEngine::reload() { + if (check_need_reload()) { + LOG(WARNING) << "begin reload model[" << _model_dir << "]."; + return load(_conf); + } + return 0; +} + +int ReloadableInferEngine::parse_version_info( + const configure::EngineDesc& config, bool version) { + _version = uint64_t(-1); + return 0; +} + +bool ReloadableInferEngine::check_need_reload() { + if (_reload_type == "timestamp_ne") { + return check_timestamp_ne(); + } else if (_reload_type == "timestamp_gt") { + return check_timestamp_gt(); + } else if (_reload_type == "md5sum") { + return check_md5sum(); + } else if (_reload_type == "revision") { + return check_revision(); + } else if (_reload_type == "none") { + return false; + } + + LOG(ERROR) << "Not support reload type: " << _reload_type; + return false; +} + +bool ReloadableInferEngine::check_timestamp_ne() { + struct stat st; + if (stat(_reload_tag_file.c_str(), &st) != 0) { + LOG(ERROR) << "Failed stat config file:" << _reload_tag_file; + return false; + } + + if ((st.st_mode & S_IFREG) && st.st_mtime != _last_record.timestamp) { + _last_record.timestamp = st.st_mtime; + return true; + } + + return false; +} + +bool ReloadableInferEngine::check_timestamp_gt() { + struct stat st; + if (stat(_reload_tag_file.c_str(), &st) != 0) { + LOG(ERROR) << "Failed stat config file:" << _reload_tag_file; + return false; + } + + if ((st.st_mode & S_IFREG) && st.st_mtime > _last_record.timestamp) { + _last_record.timestamp = st.st_mtime; + return true; + } + + return false; +} + +int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf) { + if (proc_initialize(conf, false) != 0) { + LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str(); + return -1; + } + + LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str(); + return 0; +} + +int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf, + bool version) { + std::string engine_type = conf.type(); + InferEngine* engine = + StaticInferFactory::instance().generate_object(engine_type); + if (!engine) { + LOG(ERROR) << "Failed generate engine with type:" << engine_type; + return -1; + } +#ifndef BCLOUD + VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; + int tmp = FLAGS_logtostderr; + if (engine->proc_initialize(conf, version) != 0) { + LOG(ERROR) << "Failed initialize engine, type:" << engine_type; + return -1; + } + VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; + FLAGS_logtostderr = tmp; +#else + if (engine->proc_initialize(conf, version) != 0) { + LOG(ERROR) << "Failed initialize engine, type:" << engine_type; + return -1; + } +#endif + auto r = _versions.insert(std::make_pair(engine->version(), engine)); + if (!r.second) { + LOG(ERROR) << "Failed insert item: " << engine->version() + << ", type: " << engine_type; + return -1; + } + LOG(WARNING) << "Succ proc initialize version engine: " << engine->version(); + return 0; +} + +int VersionedInferEngine::proc_finalize() { + for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { + if (iter->second->proc_finalize() != 0) { + LOG(ERROR) << "Failed proc finalize version engine: " << iter->first; + } + LOG(WARNING) << "Succ proc finalize version engine: " << iter->first; + } + return 0; +} + +int VersionedInferEngine::thrd_initialize() { + for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { + if (iter->second->thrd_initialize() != 0) { + LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first; + return -1; + } + LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first; + } + return 0; +} + +int VersionedInferEngine::thrd_clear() { + for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { + if (iter->second->thrd_clear() != 0) { + LOG(ERROR) << "Failed thrd clear version engine: " << iter->first; + return -1; + } + } + return 0; +} + +int VersionedInferEngine::thrd_finalize() { + for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { + if (iter->second->thrd_finalize() != 0) { + LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first; + return -1; + } + LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first; + } + return 0; +} + +int VersionedInferEngine::reload() { + for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { + if (iter->second->reload() != 0) { + LOG(ERROR) << "Failed reload version engine: " << iter->first; + return -1; + } + LOG(WARNING) << "Succ reload version engine: " << iter->first; + } + return 0; +} + +uint64_t VersionedInferEngine::version() const { + InferEngine* engine = default_engine(); + if (engine) { + return engine->version(); + } else { + return uint64_t(-1); + } +} + +// inference interface +InferEngine* VersionedInferEngine::default_engine() const { + if (_versions.size() != 1) { + LOG(ERROR) << "Ambiguous default engine version:" << _versions.size(); + return NULL; + } + + return _versions.begin()->second; +} + +int VersionedInferEngine::infer(const void* in, + void* out, + uint32_t batch_size) { + InferEngine* engine = default_engine(); + if (!engine) { + LOG(WARNING) << "fail to get default engine"; + return -1; + } + return engine->infer(in, out, batch_size); +} + +// versioned inference interface +int VersionedInferEngine::infer(const void* in, + void* out, + uint32_t batch_size, + uint64_t version) { + auto iter = _versions.find(version); + if (iter == _versions.end()) { + LOG(ERROR) << "Not found version engine: " << version; + return -1; + } + + return iter->second->infer(in, out, batch_size); +} + +template +T* VersionedInferEngine::get_core() { + InferEngine* engine = default_engine(); + if (!engine) { + LOG(WARNING) << "fail to get core"; + return NULL; + } + auto db_engine = dynamic_cast*>(engine); + if (db_engine) { + return db_engine->get_core(); + } + LOG(WARNING) << "fail to get core"; + return NULL; +} + +template +T* VersionedInferEngine::get_core(uint64_t version) { + auto iter = _versions.find(version); + if (iter == _versions.end()) { + LOG(ERROR) << "Not found version engine: " << version; + return NULL; + } + + auto db_engine = dynamic_cast*>(iter->second); + if (db_engine) { + return db_engine->get_core(); + } + LOG(WARNING) << "fail to get core for " << version; + return NULL; +} + +int VersionedInferEngine::proc_initialize_impl( + const configure::EngineDesc& conf, bool) { + return -1; +} + +int VersionedInferEngine::thrd_initialize_impl() { return -1; } +int VersionedInferEngine::thrd_finalize_impl() { return -1; } +int VersionedInferEngine::thrd_clear_impl() { return -1; } +int VersionedInferEngine::proc_finalize_impl() { return -1; } +int VersionedInferEngine::infer_impl(const void* in, + void* out, + uint32_t batch_size) { + return -1; +} +int VersionedInferEngine::task_infer_impl(const BatchTensor& in, + BatchTensor& out) { // NOLINT + return -1; +} + +int InferManager::proc_initialize(const char* path, const char* file) { + ModelToolkitConf model_toolkit_conf; + if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) { + LOG(ERROR) << "failed load infer config, path: " << path << "/" << file; + return -1; + } + size_t engine_num = model_toolkit_conf.engines_size(); + for (size_t ei = 0; ei < engine_num; ++ei) { + LOG(INFO) << "model_toolkit_conf.engines(" << ei + << ").name: " << model_toolkit_conf.engines(ei).name(); + std::string engine_name = model_toolkit_conf.engines(ei).name(); + VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); + if (!engine) { + LOG(ERROR) << "Failed generate versioned engine: " << engine_name; + return -1; + } + if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) { + LOG(ERROR) << "Failed initialize version engine, name:" << engine_name; + return -1; + } + auto r = _map.insert(std::make_pair(engine_name, engine)); + if (!r.second) { + LOG(ERROR) << "Failed insert item: " << engine_name; + return -1; + } + LOG(WARNING) << "Succ proc initialize engine: " << engine_name; + } + return 0; +} + +int InferManager::thrd_initialize() { + for (auto it = _map.begin(); it != _map.end(); ++it) { + if (it->second->thrd_initialize() != 0) { + LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first; + return -1; + } + LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first; + } + return 0; +} + +int InferManager::thrd_clear() { + for (auto it = _map.begin(); it != _map.end(); ++it) { + if (it->second->thrd_clear() != 0) { + LOG(ERROR) << "Failed thrd clear engine, name: " << it->first; + return -1; + } + } + return 0; +} + +int InferManager::reload() { + for (auto it = _map.begin(); it != _map.end(); ++it) { + if (it->second->reload() != 0) { + LOG(ERROR) << "Failed reload engine, name: " << it->first; + return -1; + } + } + return 0; +} + +int InferManager::thrd_finalize() { + for (auto it = _map.begin(); it != _map.end(); ++it) { + if (it->second->thrd_finalize() != 0) { + LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first; + return -1; + } + LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first; + } + return 0; +} + +int InferManager::proc_finalize() { + for (auto it = _map.begin(); it != _map.end(); ++it) { + if (it->second->proc_finalize() != 0) { + LOG(ERROR) << "Failed proc finalize engine, name: " << it->first; + return -1; + } + LOG(WARNING) << "Succ proc finalize engine, name: " << it->first; + } + _map.clear(); + return 0; +} + +// Inference interface +int InferManager::infer(const char* model_name, + const void* in, + void* out, + uint32_t batch_size) { + auto it = _map.find(model_name); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + return -1; + } + return it->second->infer(in, out, batch_size); +} + +template +T* InferManager::get_core(const char* model_name) { + auto it = _map.find(model_name); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + return NULL; + } + auto infer_engine = + dynamic_cast*>(it->second->default_engine()); + if (infer_engine) { + return infer_engine->get_core(); + } + LOG(WARNING) << "fail to get core for " << model_name; + return NULL; +} + +// Versioned inference interface +int InferManager::infer(const char* model_name, + const void* in, + void* out, + uint32_t batch_size, + uint64_t version) { + auto it = _map.find(model_name); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + return -1; + } + return it->second->infer(in, out, batch_size, version); +} + +template +T* InferManager::get_core(const char* model_name, uint64_t version) { + auto it = _map.find(model_name); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; + return NULL; + } + return it->second->get_core(version); +} + +int InferManager::query_version(const std::string& model, uint64_t& version) { + auto it = _map.find(model); + if (it == _map.end()) { + LOG(WARNING) << "Cannot find engine in map, model name:" << model; + return -1; + } + auto infer_engine = it->second->default_engine(); + if (!infer_engine) { + LOG(WARNING) << "Cannot get default engine for model:" << model; + return -1; + } + version = infer_engine->version(); + LOG(INFO) << "Succ get version: " << version << " for model: " << model; + return 0; +} + +} // namespace predictor +} // namespace paddle_serving +} // namespace baidu diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h old mode 100755 new mode 100644 index b56ed5806b34e0d6611b081ace7187fa4a9683c2..6113dc8eff60814af62ad145a334db666629f080 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -13,14 +13,14 @@ // limitations under the License. #pragma once +#include #include #include #include -#include +#include #include #include #include -#include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/bsf.h" #include "core/predictor/framework/factory.h" @@ -84,10 +84,10 @@ class InferEngine { virtual int thrd_clear_impl() = 0; virtual int proc_finalize_impl() = 0; virtual int infer_impl(const void* in, - void* out, - uint32_t batch_size = -1) = 0; + void* out, + uint32_t batch_size = -1) = 0; virtual int task_infer_impl(const BatchTensor& in, - BatchTensor& out) = 0; // NOLINT + BatchTensor& out) = 0; // NOLINT // end: framework inner call }; @@ -96,193 +96,77 @@ class ReloadableInferEngine : public InferEngine { public: virtual ~ReloadableInferEngine() {} - union last_check_status { - time_t last_timestamp; - uint64_t last_md5sum; - uint64_t last_revision; + // Reloadable record + union ReloadableRecord { + time_t timestamp; + uint64_t md5sum; + uint64_t revision; }; virtual int load(const configure::EngineDesc& conf) = 0; typedef im::bsf::Task TaskT; - int proc_initialize_impl(const configure::EngineDesc& conf, bool version) { - _reload_tag_file = conf.reloadable_meta(); - _reload_mode_tag = conf.reloadable_type(); - _model_data_path = conf.model_dir(); - _infer_thread_num = conf.runtime_thread_num(); - _infer_batch_size = conf.batch_infer_size(); - _infer_batch_align = conf.enable_batch_align(); - - _conf = conf; - - if (!check_need_reload() || load(conf) != 0) { - LOG(ERROR) << "Failed load model_data_path" << _model_data_path; - return -1; - } - - if (parse_version_info(conf, version) != 0) { - LOG(ERROR) << "Failed parse version info"; - return -1; - } - - LOG(WARNING) << "Succ load model_data_path" << _model_data_path; - return 0; - } - - int proc_initialize(const configure::EngineDesc& conf, bool version) { - if (proc_initialize_impl(conf, version) != 0) { - LOG(ERROR) << "Failed proc initialize impl"; - return -1; - } - - // init bsf framework - if (_infer_thread_num <= 0) { - return 0; - } - - // init bsf framework - im::bsf::TaskExecutor::instance()->set_thread_init_fn( - boost::bind(&InferEngine::thrd_initialize_impl, this)); - im::bsf::TaskExecutor::instance()->set_thread_reset_fn( - boost::bind(&InferEngine::thrd_clear_impl, this)); - im::bsf::TaskExecutor::instance()->set_thread_callback_fn( - boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); - im::bsf::TaskExecutor::instance()->set_batch_size(_infer_batch_size); - im::bsf::TaskExecutor::instance()->set_batch_align( - _infer_batch_align); - if (im::bsf::TaskExecutor::instance()->start(_infer_thread_num) != - 0) { - LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; - return -1; - } - - LOG(WARNING) << "Enable batch schedule framework, thread_num:" - << _infer_thread_num << ", batch_size:" << _infer_batch_size - << ", enable_batch_align:" << _infer_batch_align; - return 0; - } + int proc_initialize_impl(const configure::EngineDesc& conf, bool version); - int infer(const void* in, void* out, uint32_t batch_size = -1) { - if (_infer_thread_num <= 0) { - return infer_impl(in, out, batch_size); - } + int proc_initialize(const configure::EngineDesc& conf, bool version); - im::bsf::TaskManager task_manager; - task_manager.schedule(*(reinterpret_cast(in)), - *(reinterpret_cast(out))); - task_manager.wait(); - return 0; - } + int infer(const void* in, void* out, uint32_t batch_size = -1); - int thrd_initialize() { - if (_infer_thread_num > 0) { - return 0; - } - return thrd_initialize_impl(); - } + int thrd_initialize(); - int thrd_clear() { - if (_infer_thread_num > 0) { - return 0; - } + int thrd_clear(); - return thrd_clear_impl(); - } + int proc_finalize(); - int proc_finalize() { - if (proc_finalize_impl() != 0) { - LOG(ERROR) << "Failed proc finalize impl"; - return -1; - } - - if (_infer_thread_num > 0) { - im::bsf::TaskExecutor::instance()->stop(); - } - return 0; - } - - int reload() { - if (check_need_reload()) { - LOG(WARNING) << "begin reload model[" << _model_data_path << "]."; - return load(_conf); - } - return 0; - } + int reload(); uint64_t version() const { return _version; } uint32_t thread_num() const { return _infer_thread_num; } private: - int parse_version_info(const configure::EngineDesc& config, bool version) { - _version = uint64_t(-1); - return 0; - } - - bool check_need_reload() { - if (_reload_mode_tag == "timestamp_ne") { - return check_timestamp_ne(); - } else if (_reload_mode_tag == "timestamp_gt") { - return check_timestamp_gt(); - } else if (_reload_mode_tag == "md5sum") { - return check_md5sum(); - } else if (_reload_mode_tag == "revision") { - return check_revision(); - } else if (_reload_mode_tag == "none") { - return false; - } else { - LOG(ERROR) << "Not support check type: " << _reload_mode_tag; - return false; - } - } + int parse_version_info(const configure::EngineDesc& config, bool version); - bool check_timestamp_ne() { - struct stat st; - if (stat(_reload_tag_file.c_str(), &st) != 0) { - LOG(ERROR) << "Failed stat config file:" << _reload_tag_file; - return false; - } + bool check_need_reload(); - if ((st.st_mode & S_IFREG) && st.st_mtime != _last_status.last_timestamp) { - _last_status.last_timestamp = st.st_mtime; - return true; - } + bool check_timestamp_ne(); - return false; - } - - bool check_timestamp_gt() { - struct stat st; - if (stat(_reload_tag_file.c_str(), &st) != 0) { - LOG(ERROR) << "Failed stat config file:" << _reload_tag_file; - return false; - } - - if ((st.st_mode & S_IFREG) && st.st_mtime > _last_status.last_timestamp) { - _last_status.last_timestamp = st.st_mtime; - return true; - } - - return false; - } + bool check_timestamp_gt(); bool check_md5sum() { return false; } bool check_revision() { return false; } protected: - std::string _model_data_path; + // Model directory + std::string _model_dir; + + // The description of inference engine configure::EngineDesc _conf; private: + // Tag file of reloadable model std::string _reload_tag_file; - std::string _reload_mode_tag; - last_check_status _last_status; + + // Type of reload, e.g. timestamp_ne, timestamp_gt, md5sum, reversion + std::string _reload_type; + + // Record the last loading infermation + ReloadableRecord _last_record; + + // Number of inference threads uint32_t _infer_thread_num; + + // Size of inference batch uint32_t _infer_batch_size; + + // Need to align batch_size in inferring bool _infer_batch_align; + + // model version uint64_t _version; }; +// Lock free switching two models template struct ModelData { ModelData() : current_idx(1) { @@ -323,12 +207,10 @@ class DBReloadableInferEngine : public ReloadableInferEngine { } LOG(WARNING) << "Succ load engine, path: " << conf.model_dir(); - return 0; } - int load_data(ModelData* md, - const configure::EngineDesc& conf) { + int load_data(ModelData* md, const configure::EngineDesc& conf) { uint32_t next_idx = (md->current_idx + 1) % 2; if (md->cores[next_idx]) { delete md->cores[next_idx]; @@ -354,11 +236,11 @@ class DBReloadableInferEngine : public ReloadableInferEngine { ModelData* md = new (std::nothrow) ModelData; if (!md || load_data(md, _conf) != 0) { - LOG(ERROR) << "Failed create thread data from " - << _conf.model_dir(); + LOG(ERROR) << "Failed create thread data from " << _conf.model_dir(); return -1; } + LOG(ERROR) << "THREAD_SETSPECIFIC _skey = md"; THREAD_SETSPECIFIC(_skey, md); im::bsf::AutoMutex lock(_mutex); _reload_vec.push_back(md); @@ -396,8 +278,6 @@ class DBReloadableInferEngine : public ReloadableInferEngine { THREAD_KEY_T _skey; THREAD_MUTEX_T _mutex; std::vector*> _reload_vec; - - private: }; // 多个EngineCore共用同一份模型数据 @@ -442,7 +322,6 @@ class CloneDBReloadableInferEngine } LOG(WARNING) << "Succ load clone model, path[" << conf.model_dir() << "]"; - return 0; } @@ -516,10 +395,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { // Set the lod and shape information of InputData first. // Then copy data from cpu to the core. const TensorVector* tensorVector_in_pointer = - reinterpret_cast(in); - for (int i=0; i < tensorVector_in_pointer->size(); ++i) { + reinterpret_cast(in); + for (int i = 0; i < tensorVector_in_pointer->size(); ++i) { auto lod_tensor_in = - core->GetInputHandle((*tensorVector_in_pointer)[i].name); + core->GetInputHandle((*tensorVector_in_pointer)[i].name); lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod); lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape); void* origin_data = (*tensorVector_in_pointer)[i].data.data(); @@ -531,11 +410,11 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { float* data = static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); } else if ((*tensorVector_in_pointer)[i].dtype == - paddle::PaddleDType::INT64) { + paddle::PaddleDType::INT64) { int64_t* data = static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); } else if ((*tensorVector_in_pointer)[i].dtype == - paddle::PaddleDType::INT32) { + paddle::PaddleDType::INT32) { int32_t* data = static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); } @@ -543,8 +422,8 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { // After the input data is passed in, // call 'core->Run()' perform the prediction process. if (!core->Run()) { - LOG(ERROR) << "Failed run fluid family core"; - return -1; + LOG(ERROR) << "Failed run fluid family core"; + return -1; } // In order to get the results, // first, call the 'core->GetOutputNames()' to get the name of output @@ -558,7 +437,7 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { char* databuf_char = NULL; size_t databuf_size = 0; TensorVector* tensorVector_out_pointer = - reinterpret_cast(out); + reinterpret_cast(out); if (!tensorVector_out_pointer) { LOG(ERROR) << "tensorVector_out_pointer is nullptr,error"; return -1; @@ -567,38 +446,38 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { // then copy data to cpu from the core. // The pointer type of data_out must be one of // float *,int64_t*,int32_t* instead void*. - for (int i=0; i < outnames.size(); ++i) { + for (int i = 0; i < outnames.size(); ++i) { auto lod_tensor_out = core->GetOutputHandle(outnames[i]); output_shape = lod_tensor_out->shape(); out_num = std::accumulate( output_shape.begin(), output_shape.end(), 1, std::multiplies()); dataType = lod_tensor_out->type(); if (dataType == paddle::PaddleDType::FLOAT32) { - databuf_size = out_num*sizeof(float); + databuf_size = out_num * sizeof(float); databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { - LOG(ERROR) << "Malloc failed, size: " << databuf_size; - return -1; + LOG(ERROR) << "Malloc failed, size: " << databuf_size; + return -1; } float* data_out = reinterpret_cast(databuf_data); lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); } else if (dataType == paddle::PaddleDType::INT64) { - databuf_size = out_num*sizeof(int64_t); + databuf_size = out_num * sizeof(int64_t); databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { - LOG(ERROR) << "Malloc failed, size: " << databuf_size; - return -1; + LOG(ERROR) << "Malloc failed, size: " << databuf_size; + return -1; } int64_t* data_out = reinterpret_cast(databuf_data); lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); } else if (dataType == paddle::PaddleDType::INT32) { - databuf_size = out_num*sizeof(int32_t); + databuf_size = out_num * sizeof(int32_t); databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { - LOG(ERROR) << "Malloc failed, size: " << databuf_size; - return -1; + LOG(ERROR) << "Malloc failed, size: " << databuf_size; + return -1; } int32_t* data_out = reinterpret_cast(databuf_data); lod_tensor_out->CopyToCpu(data_out); @@ -614,7 +493,7 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { tensor_out.dtype = paddle::PaddleDType(dataType); tensor_out.shape.assign(output_shape.begin(), output_shape.end()); std::vector> out_lod = lod_tensor_out->lod(); - for (int li=0; li < out_lod.size(); ++li) { + for (int li = 0; li < out_lod.size(); ++li) { std::vector lod_element; lod_element.assign(out_lod[li].begin(), out_lod[li].end()); tensor_out.lod.push_back(lod_element); @@ -638,187 +517,49 @@ class VersionedInferEngine : public InferEngine { VersionedInferEngine() { _versions.clear(); } ~VersionedInferEngine() {} - int proc_initialize(const configure::EngineDesc& conf) { - if (proc_initialize(conf, false) != 0) { - LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str(); - return -1; - } - - LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str(); - return 0; - } + int proc_initialize(const configure::EngineDesc& conf); - int proc_initialize(const configure::EngineDesc& conf, bool version) { - std::string engine_type = conf.type(); - InferEngine* engine = - StaticInferFactory::instance().generate_object(engine_type); - if (!engine) { - LOG(ERROR) << "Failed generate engine with type:" << engine_type; - return -1; - } -#ifndef BCLOUD - VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; - int tmp = FLAGS_logtostderr; - if (engine->proc_initialize(conf, version) != 0) { - LOG(ERROR) << "Failed initialize engine, type:" << engine_type; - return -1; - } - VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; - FLAGS_logtostderr = tmp; -#else - if (engine->proc_initialize(conf, version) != 0) { - LOG(ERROR) << "Failed initialize engine, type:" << engine_type; - return -1; - } -#endif - auto r = _versions.insert(std::make_pair(engine->version(), engine)); - if (!r.second) { - LOG(ERROR) << "Failed insert item: " << engine->version() - << ", type: " << engine_type; - return -1; - } - LOG(WARNING) << "Succ proc initialize version engine: " - << engine->version(); - return 0; - } + int proc_initialize(const configure::EngineDesc& conf, bool version); - int proc_finalize() { - for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { - if (iter->second->proc_finalize() != 0) { - LOG(ERROR) << "Failed proc finalize version engine: " << iter->first; - } - LOG(WARNING) << "Succ proc finalize version engine: " << iter->first; - } - return 0; - } + int proc_finalize(); - int thrd_initialize() { - for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { - if (iter->second->thrd_initialize() != 0) { - LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first; - return -1; - } - LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first; - } - return 0; - } + int thrd_initialize(); - int thrd_clear() { - for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { - if (iter->second->thrd_clear() != 0) { - LOG(ERROR) << "Failed thrd clear version engine: " << iter->first; - return -1; - } - } - return 0; - } + int thrd_clear(); - int thrd_finalize() { - for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { - if (iter->second->thrd_finalize() != 0) { - LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first; - return -1; - } - LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first; - } - return 0; - } + int thrd_finalize(); - int reload() { - for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) { - if (iter->second->reload() != 0) { - LOG(ERROR) << "Failed reload version engine: " << iter->first; - return -1; - } - LOG(WARNING) << "Succ reload version engine: " << iter->first; - } - return 0; - } + int reload(); - uint64_t version() const { - InferEngine* engine = default_engine(); - if (engine) { - return engine->version(); - } else { - return uint64_t(-1); - } - } + uint64_t version() const; // inference interface - InferEngine* default_engine() const { - if (_versions.size() != 1) { - LOG(ERROR) << "Ambiguous default engine version:" << _versions.size(); - return NULL; - } + InferEngine* default_engine() const; - return _versions.begin()->second; - } - - int infer(const void* in, void* out, uint32_t batch_size) { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get default engine"; - return -1; - } - return engine->infer(in, out, batch_size); - } + int infer(const void* in, void* out, uint32_t batch_size); template - T* get_core() { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get core"; - return NULL; - } - auto db_engine = dynamic_cast*>(engine); - if (db_engine) { - return db_engine->get_core(); - } - LOG(WARNING) << "fail to get core"; - return NULL; - } + T* get_core(); // versioned inference interface - int infer(const void* in, void* out, uint32_t batch_size, uint64_t version) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - return -1; - } - - return iter->second->infer(in, out, batch_size); - } + int infer(const void* in, void* out, uint32_t batch_size, uint64_t version); template - T* get_core(uint64_t version) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - return NULL; - } + T* get_core(uint64_t version); - auto db_engine = dynamic_cast*>(iter->second); - if (db_engine) { - return db_engine->get_core(); - } - LOG(WARNING) << "fail to get core for " << version; - return NULL; - } + int proc_initialize_impl(const configure::EngineDesc& conf, bool); - // -- - int proc_initialize_impl(const configure::EngineDesc& conf, bool) { - return -1; - } - int thrd_initialize_impl() { return -1; } - int thrd_finalize_impl() { return -1; } - int thrd_clear_impl() { return -1; } - int proc_finalize_impl() { return -1; } - int infer_impl(const void* in, void* out, uint32_t batch_size = -1) { - return -1; - } - int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT - return -1; - } // NOLINT + int thrd_initialize_impl(); + + int thrd_finalize_impl(); + + int thrd_clear_impl(); + + int proc_finalize_impl(); + + int infer_impl(const void* in, void* out, uint32_t batch_size = -1); + + int task_infer_impl(const BatchTensor& in, BatchTensor& out); private: boost::unordered_map _versions; @@ -831,158 +572,38 @@ class InferManager { return ins; } - int proc_initialize(const char* path, const char* file) { - ModelToolkitConf model_toolkit_conf; - if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) { - LOG(ERROR) << "failed load infer config, path: " << path << "/" << file; - return -1; - } - size_t engine_num = model_toolkit_conf.engines_size(); - for (size_t ei = 0; ei < engine_num; ++ei) { - LOG(INFO) << "model_toolkit_conf.engines(" << ei - << ").name: " << model_toolkit_conf.engines(ei).name(); - std::string engine_name = model_toolkit_conf.engines(ei).name(); - VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); - if (!engine) { - LOG(ERROR) << "Failed generate versioned engine: " << engine_name; - return -1; - } - if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) { - LOG(ERROR) << "Failed initialize version engine, name:" << engine_name; - return -1; - } - auto r = _map.insert(std::make_pair(engine_name, engine)); - if (!r.second) { - LOG(ERROR) << "Failed insert item: " << engine_name; - return -1; - } - LOG(WARNING) << "Succ proc initialize engine: " << engine_name; - } - return 0; - } + int proc_initialize(const char* path, const char* file); - int thrd_initialize() { - for (auto it = _map.begin(); it != _map.end(); ++it) { - if (it->second->thrd_initialize() != 0) { - LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first; - return -1; - } - LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first; - } - return 0; - } + int thrd_initialize(); - int thrd_clear() { - for (auto it = _map.begin(); it != _map.end(); ++it) { - if (it->second->thrd_clear() != 0) { - LOG(ERROR) << "Failed thrd clear engine, name: " << it->first; - return -1; - } - } - return 0; - } + int thrd_clear(); - int reload() { - for (auto it = _map.begin(); it != _map.end(); ++it) { - if (it->second->reload() != 0) { - LOG(ERROR) << "Failed reload engine, name: " << it->first; - return -1; - } - } - return 0; - } + int reload(); - int thrd_finalize() { - for (auto it = _map.begin(); it != _map.end(); ++it) { - if (it->second->thrd_finalize() != 0) { - LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first; - return -1; - } - LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first; - } - return 0; - } + int thrd_finalize(); - int proc_finalize() { - for (auto it = _map.begin(); it != _map.end(); ++it) { - if (it->second->proc_finalize() != 0) { - LOG(ERROR) << "Failed proc finalize engine, name: " << it->first; - return -1; - } - LOG(WARNING) << "Succ proc finalize engine, name: " << it->first; - } - _map.clear(); - return 0; - } + int proc_finalize(); // Inference interface int infer(const char* model_name, const void* in, void* out, - uint32_t batch_size = -1) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - return -1; - } - return it->second->infer(in, out, batch_size); - } + uint32_t batch_size = -1); template - T* get_core(const char* model_name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - return NULL; - } - auto infer_engine = - dynamic_cast*>(it->second->default_engine()); - if (infer_engine) { - return infer_engine->get_core(); - } - LOG(WARNING) << "fail to get core for " << model_name; - return NULL; - } + T* get_core(const char* model_name); // Versioned inference interface int infer(const char* model_name, const void* in, void* out, uint32_t batch_size, - uint64_t version) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - return -1; - } - return it->second->infer(in, out, batch_size, version); - } + uint64_t version); template - T* get_core(const char* model_name, uint64_t version) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - return NULL; - } - return it->second->get_core(version); - } + T* get_core(const char* model_name, uint64_t version); - int query_version(const std::string& model, uint64_t& version) { // NOLINT - auto it = _map.find(model); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model; - return -1; - } - auto infer_engine = it->second->default_engine(); - if (!infer_engine) { - LOG(WARNING) << "Cannot get default engine for model:" << model; - return -1; - } - version = infer_engine->version(); - LOG(INFO) << "Succ get version: " << version << " for model: " << model; - return 0; - } + int query_version(const std::string& model, uint64_t& version); private: boost::unordered_map _map; diff --git a/core/predictor/framework/resource.cpp b/core/predictor/framework/resource.cpp old mode 100755 new mode 100644 index 4da0ef537b26ba7e765471b40b0b8d035b598dc1..37c8092a7c206ae91ac783b15f3aadce780f0132 --- a/core/predictor/framework/resource.cpp +++ b/core/predictor/framework/resource.cpp @@ -27,22 +27,9 @@ namespace predictor { using configure::ResourceConf; using configure::GeneralModelConfig; using rec::mcube::CubeAPI; -// __thread bool p_thread_initialized = false; -static void dynamic_resource_deleter(void* d) { -#if 1 - LOG(INFO) << "dynamic_resource_delete on " << bthread_self(); -#endif - delete static_cast(d); -} - -DynamicResource::DynamicResource() {} - -DynamicResource::~DynamicResource() {} - -int DynamicResource::initialize() { return 0; } - -std::vector > Resource::get_general_model_config() { +std::vector> +Resource::get_general_model_config() { return _configs; } @@ -96,8 +83,6 @@ void Resource::print_general_model_config( } } -int DynamicResource::clear() { return 0; } - int Resource::initialize(const std::string& path, const std::string& file) { ResourceConf resource_conf; if (configure::read_proto_conf(path, file, &resource_conf) != 0) { @@ -150,29 +135,25 @@ int Resource::initialize(const std::string& path, const std::string& file) { if (FLAGS_enable_model_toolkit) { size_t model_toolkit_num = resource_conf.model_toolkit_path_size(); - for (size_t mi=0; mi < model_toolkit_num; ++mi) { + for (size_t mi = 0; mi < model_toolkit_num; ++mi) { std::string model_toolkit_path = resource_conf.model_toolkit_path(mi); std::string model_toolkit_file = resource_conf.model_toolkit_file(mi); if (InferManager::instance().proc_initialize( model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) { LOG(ERROR) << "failed proc initialize modeltoolkit, config: " - << model_toolkit_path << "/" << model_toolkit_file; + << model_toolkit_path << "/" << model_toolkit_file; return -1; } if (KVManager::instance().proc_initialize( model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) { LOG(ERROR) << "Failed proc initialize kvmanager, config: " - << model_toolkit_path << "/" << model_toolkit_file; + << model_toolkit_path << "/" << model_toolkit_file; } } } - if (THREAD_KEY_CREATE(&_tls_bspec_key, dynamic_resource_deleter) != 0) { - LOG(ERROR) << "unable to create tls_bthread_key of thrd_data"; - return -1; - } // init rocksDB or cube instance if (resource_conf.has_cube_config_file() && resource_conf.has_cube_config_path()) { @@ -225,18 +206,16 @@ int Resource::general_model_initialize(const std::string& path, return -1; } size_t general_model_num = resource_conf.general_model_path_size(); - for (size_t gi=0; gi < general_model_num; ++gi) { - - + for (size_t gi = 0; gi < general_model_num; ++gi) { std::string general_model_path = resource_conf.general_model_path(gi); std::string general_model_file = resource_conf.general_model_file(gi); GeneralModelConfig model_config; if (configure::read_proto_conf(general_model_path.c_str(), - general_model_file.c_str(), - &model_config) != 0) { - LOG(ERROR) << "Failed initialize model config from: " << general_model_path - << "/" << general_model_file; + general_model_file.c_str(), + &model_config) != 0) { + LOG(ERROR) << "Failed initialize model config from: " + << general_model_path << "/" << general_model_file; return -1; } auto _config = std::make_shared(); @@ -249,7 +228,7 @@ int Resource::general_model_initialize(const std::string& path, _config->_is_lod_feed.resize(feed_var_num); _config->_capacity.resize(feed_var_num); _config->_feed_shape.resize(feed_var_num); - for (int i=0; i < feed_var_num; ++i) { + for (int i = 0; i < feed_var_num; ++i) { _config->_feed_name[i] = model_config.feed_var(i).name(); _config->_feed_alias_name[i] = model_config.feed_var(i).alias_name(); VLOG(2) << "feed var[" << i << "]: " << _config->_feed_name[i]; @@ -265,7 +244,7 @@ int Resource::general_model_initialize(const std::string& path, VLOG(2) << "var[" << i << "] is tensor"; _config->_capacity[i] = 1; _config->_is_lod_feed[i] = false; - for (int j=0; j < model_config.feed_var(i).shape_size(); ++j) { + for (int j = 0; j < model_config.feed_var(i).shape_size(); ++j) { int32_t dim = model_config.feed_var(i).shape(j); VLOG(2) << "var[" << i << "].shape[" << i << "]: " << dim; _config->_feed_shape[i].push_back(dim); @@ -279,7 +258,7 @@ int Resource::general_model_initialize(const std::string& path, _config->_fetch_name.resize(fetch_var_num); _config->_fetch_alias_name.resize(fetch_var_num); _config->_fetch_shape.resize(fetch_var_num); - for (int i=0; i < fetch_var_num; ++i) { + for (int i = 0; i < fetch_var_num; ++i) { _config->_fetch_name[i] = model_config.fetch_var(i).name(); _config->_fetch_alias_name[i] = model_config.fetch_var(i).alias_name(); _config->_fetch_name_to_index[_config->_fetch_name[i]] = i; @@ -290,7 +269,7 @@ int Resource::general_model_initialize(const std::string& path, _config->_is_lod_fetch[i] = true; } else { _config->_is_lod_fetch[i] = false; - for (int j=0; j < model_config.fetch_var(i).shape_size(); ++j) { + for (int j = 0; j < model_config.fetch_var(i).shape_size(); ++j) { int dim = model_config.fetch_var(i).shape(j); _config->_fetch_shape[i].push_back(dim); } @@ -316,36 +295,6 @@ int Resource::thread_initialize() { return -1; } - DynamicResource* p_dynamic_resource = - reinterpret_cast(THREAD_GETSPECIFIC(_tls_bspec_key)); - if (p_dynamic_resource == NULL) { - p_dynamic_resource = new (std::nothrow) DynamicResource; - if (p_dynamic_resource == NULL) { - LOG(ERROR) << "failed to create tls DynamicResource"; - return -1; - } - if (p_dynamic_resource->initialize() != 0) { - LOG(ERROR) << "DynamicResource initialize failed."; - delete p_dynamic_resource; - p_dynamic_resource = NULL; - return -1; - } - - if (THREAD_SETSPECIFIC(_tls_bspec_key, p_dynamic_resource) != 0) { - LOG(ERROR) << "unable to set tls DynamicResource"; - delete p_dynamic_resource; - p_dynamic_resource = NULL; - return -1; - } - } -#if 0 - LOG(INFO) << "Successfully thread initialized dynamic resource"; -#else - LOG(INFO) << bthread_self() - << ": Successfully thread initialized dynamic resource " - << p_dynamic_resource; - -#endif return 0; } @@ -362,26 +311,6 @@ int Resource::thread_clear() { LOG(ERROR) << "Failed thrd clear infer manager"; return -1; } - - DynamicResource* p_dynamic_resource = - reinterpret_cast(THREAD_GETSPECIFIC(_tls_bspec_key)); - if (p_dynamic_resource == NULL) { -#if 0 - LOG(ERROR) << "tls dynamic resource shouldn't be null after " - << "thread_initialize"; -#else - LOG(ERROR) - << bthread_self() - << ": tls dynamic resource shouldn't be null after thread_initialize"; -#endif - return -1; - } - if (p_dynamic_resource->clear() != 0) { - LOG(ERROR) << "Failed to invoke dynamic resource clear"; - return -1; - } - - // ... return 0; } size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; } diff --git a/core/predictor/framework/resource.h b/core/predictor/framework/resource.h old mode 100755 new mode 100644 index 48a578f9f75488504415aaf449f5b62fc7fc1cb9..e144120e5a67bc2a43433cb3857331e9d1a465cf --- a/core/predictor/framework/resource.h +++ b/core/predictor/framework/resource.h @@ -54,15 +54,6 @@ class PaddleGeneralModelConfig { }; class BaseRdDict; -struct DynamicResource { - DynamicResource(); - - ~DynamicResource(); - - int initialize(); - - int clear(); -}; class Resource { public: @@ -94,20 +85,17 @@ class Resource { int finalize(); - std::vector > get_general_model_config(); + std::vector> + get_general_model_config(); void print_general_model_config( const std::shared_ptr& config); - DynamicResource* get_dynamic_resource() { - return reinterpret_cast( - THREAD_GETSPECIFIC(_tls_bspec_key)); - } size_t get_cube_quant_bits(); private: int thread_finalize() { return 0; } - std::vector > _configs; + std::vector> _configs; std::string cube_config_fullpath; int cube_quant_bits; // 0 if no empty