未验证 提交 c22e0552 编写于 作者: T TeslaZhao 提交者: GitHub

Merge branch 'develop' into add-version

......@@ -47,17 +47,14 @@ int conf_check(const Request *req,
for (int i = 0; i < var_num; ++i) {
const Tensor &tensor = req->insts(0).tensor_array(i);
if (model_config->_feed_type[i] !=
tensor.elem_type()) {
if (model_config->_feed_type[i] != tensor.elem_type()) {
LOG(ERROR) << "feed type not match.";
return -1;
}
if (model_config->_feed_shape[i].size() ==
tensor.shape_size()) {
if (model_config->_feed_shape[i].size() == tensor.shape_size()) {
for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) {
tensor.shape(j);
if (model_config->_feed_shape[i][j] !=
tensor.shape(j)) {
if (model_config->_feed_shape[i][j] != tensor.shape(j)) {
LOG(ERROR) << "feed shape not match.";
return -1;
}
......@@ -73,6 +70,11 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() {
// read request from client
const Request *req = dynamic_cast<const Request *>(get_request_message());
if (!req) {
LOG(ERROR) << "Failed get request message";
return -1;
}
uint64_t log_id = req->log_id();
int input_var_num = 0;
std::vector<int64_t> elem_type;
......@@ -80,14 +82,18 @@ int GeneralReaderOp::inference() {
std::vector<int64_t> databuf_size;
GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &(res->tensor_vector);
res->SetLogId(log_id);
if (!res) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op tls reader object output";
LOG(ERROR) << "(logid=" << log_id << ") Failed get GeneralBlob";
return -1;
}
TensorVector *out = &(res->tensor_vector);
if (!out) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get tensor_vector of res";
return -1;
}
res->SetLogId(log_id);
Timer timeline;
int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size();
......@@ -99,7 +105,7 @@ int GeneralReaderOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
//get the first InferOP's model_config as ReaderOp's model_config by default.
// get the first InferOP's model_config as ReaderOp's model_config by default.
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config().front();
......@@ -140,10 +146,11 @@ int GeneralReaderOp::inference() {
lod_tensor.dtype = paddle::PaddleDType::INT32;
data_len = tensor.int_data_size();
} else if (elem_type[i] == P_STRING) {
//use paddle::PaddleDType::UINT8 as for String.
// use paddle::PaddleDType::UINT8 as for String.
elem_size[i] = sizeof(uint8_t);
lod_tensor.dtype = paddle::PaddleDType::UINT8;
//this is for vector<String>, cause the databuf_size != vector<String>.size()*sizeof(char);
// this is for vector<String>, cause the databuf_size !=
// vector<String>.size()*sizeof(char);
for (int idx = 0; idx < tensor.data_size(); idx++) {
data_len += tensor.data()[idx].length();
}
......@@ -157,34 +164,32 @@ int GeneralReaderOp::inference() {
for (int k = 0; k < tensor.lod_size(); ++k) {
lod_tensor.lod[0].push_back(tensor.lod(k));
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
}
for (int k = 0; k < tensor.shape_size(); ++k) {
int dim = tensor.shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim;
lod_tensor.shape.push_back(dim);
}
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
databuf_size[i] = data_len * elem_size[i];
out->at(i).data.Resize(data_len * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
if (elem_type[i] == P_INT64) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << tensor.int64_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.int64_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.int64_data().data(), databuf_size[i]);
/*
int elem_num = tensor.int64_data_size();
for (int k = 0; k < elem_num; ++k) {
......@@ -197,9 +202,9 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.float_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.float_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.float_data().data(), databuf_size[i]);
/*int elem_num = tensor.float_data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = tensor.float_data(k);
......@@ -210,9 +215,9 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.int_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.int_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.int_data().data(), databuf_size[i]);
/*
int elem_num = tensor.int_data_size();
for (int k = 0; k < elem_num; ++k) {
......@@ -225,7 +230,7 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
int elem_num = tensor.data_size();
for (int k = 0; k < elem_num; ++k) {
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/predictor/framework/infer.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int ReloadableInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
_reload_type = conf.reloadable_type();
_model_dir = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
_conf = conf;
if (!check_need_reload() || load(conf) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_dir;
return -1;
}
if (parse_version_info(conf, version) != 0) {
LOG(ERROR) << "Failed parse version info";
return -1;
}
LOG(WARNING) << "Succ load model:" << _model_dir;
return 0;
}
int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
bool version) {
if (proc_initialize_impl(conf, version) != 0) {
LOG(ERROR) << "Failed proc initialize impl";
return -1;
}
// init bsf framework
if (_infer_thread_num <= 0) {
return 0;
}
// init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn(
boost::bind(&InferEngine::thrd_clear_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size);
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(_infer_batch_align);
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) != 0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1;
}
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
return 0;
}
int ReloadableInferEngine::infer(const void* in,
void* out,
uint32_t batch_size) {
if (_infer_thread_num <= 0) {
return infer_impl(in, out, batch_size);
}
im::bsf::TaskManager<Tensor, Tensor> task_manager;
task_manager.schedule(*(reinterpret_cast<const BatchTensor*>(in)),
*(reinterpret_cast<BatchTensor*>(out)));
task_manager.wait();
return 0;
}
int ReloadableInferEngine::thrd_initialize() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_initialize_impl();
}
int ReloadableInferEngine::thrd_clear() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_clear_impl();
}
int ReloadableInferEngine::proc_finalize() {
if (proc_finalize_impl() != 0) {
LOG(ERROR) << "Failed proc finalize impl";
return -1;
}
if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop();
}
return 0;
}
int ReloadableInferEngine::reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_dir << "].";
return load(_conf);
}
return 0;
}
int ReloadableInferEngine::parse_version_info(
const configure::EngineDesc& config, bool version) {
_version = uint64_t(-1);
return 0;
}
bool ReloadableInferEngine::check_need_reload() {
if (_reload_type == "timestamp_ne") {
return check_timestamp_ne();
} else if (_reload_type == "timestamp_gt") {
return check_timestamp_gt();
} else if (_reload_type == "md5sum") {
return check_md5sum();
} else if (_reload_type == "revision") {
return check_revision();
} else if (_reload_type == "none") {
return false;
}
LOG(ERROR) << "Not support reload type: " << _reload_type;
return false;
}
bool ReloadableInferEngine::check_timestamp_ne() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime != _last_record.timestamp) {
_last_record.timestamp = st.st_mtime;
return true;
}
return false;
}
bool ReloadableInferEngine::check_timestamp_gt() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime > _last_record.timestamp) {
_last_record.timestamp = st.st_mtime;
return true;
}
return false;
}
int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf) {
if (proc_initialize(conf, false) != 0) {
LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str();
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str();
return 0;
}
int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf,
bool version) {
std::string engine_type = conf.type();
InferEngine* engine =
StaticInferFactory::instance().generate_object(engine_type);
if (!engine) {
LOG(ERROR) << "Failed generate engine with type:" << engine_type;
return -1;
}
#ifndef BCLOUD
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
int tmp = FLAGS_logtostderr;
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
FLAGS_logtostderr = tmp;
#else
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
#endif
auto r = _versions.insert(std::make_pair(engine->version(), engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine->version()
<< ", type: " << engine_type;
return -1;
}
LOG(WARNING) << "Succ proc initialize version engine: " << engine->version();
return 0;
}
int VersionedInferEngine::proc_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize version engine: " << iter->first;
}
LOG(WARNING) << "Succ proc finalize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::thrd_initialize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::thrd_clear() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear version engine: " << iter->first;
return -1;
}
}
return 0;
}
int VersionedInferEngine::thrd_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::reload() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->reload() != 0) {
LOG(ERROR) << "Failed reload version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ reload version engine: " << iter->first;
}
return 0;
}
uint64_t VersionedInferEngine::version() const {
InferEngine* engine = default_engine();
if (engine) {
return engine->version();
} else {
return uint64_t(-1);
}
}
// inference interface
InferEngine* VersionedInferEngine::default_engine() const {
if (_versions.size() != 1) {
LOG(ERROR) << "Ambiguous default engine version:" << _versions.size();
return NULL;
}
return _versions.begin()->second;
}
int VersionedInferEngine::infer(const void* in,
void* out,
uint32_t batch_size) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return -1;
}
return engine->infer(in, out, batch_size);
}
// versioned inference interface
int VersionedInferEngine::infer(const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return -1;
}
return iter->second->infer(in, out, batch_size);
}
template <typename T>
T* VersionedInferEngine::get_core() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get core";
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(engine);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core";
return NULL;
}
template <typename T>
T* VersionedInferEngine::get_core(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(iter->second);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << version;
return NULL;
}
int VersionedInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool) {
return -1;
}
int VersionedInferEngine::thrd_initialize_impl() { return -1; }
int VersionedInferEngine::thrd_finalize_impl() { return -1; }
int VersionedInferEngine::thrd_clear_impl() { return -1; }
int VersionedInferEngine::proc_finalize_impl() { return -1; }
int VersionedInferEngine::infer_impl(const void* in,
void* out,
uint32_t batch_size) {
return -1;
}
int VersionedInferEngine::task_infer_impl(const BatchTensor& in,
BatchTensor& out) { // NOLINT
return -1;
}
int InferManager::proc_initialize(const char* path, const char* file) {
ModelToolkitConf model_toolkit_conf;
if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) {
LOG(ERROR) << "failed load infer config, path: " << path << "/" << file;
return -1;
}
size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) {
LOG(ERROR) << "Failed generate versioned engine: " << engine_name;
return -1;
}
if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) {
LOG(ERROR) << "Failed initialize version engine, name:" << engine_name;
return -1;
}
auto r = _map.insert(std::make_pair(engine_name, engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine_name;
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << engine_name;
}
return 0;
}
int InferManager::thrd_initialize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first;
}
return 0;
}
int InferManager::thrd_clear() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear engine, name: " << it->first;
return -1;
}
}
return 0;
}
int InferManager::reload() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->reload() != 0) {
LOG(ERROR) << "Failed reload engine, name: " << it->first;
return -1;
}
}
return 0;
}
int InferManager::thrd_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first;
}
return 0;
}
int InferManager::proc_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ proc finalize engine, name: " << it->first;
}
_map.clear();
return 0;
}
// Inference interface
int InferManager::infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size);
}
template <typename T>
T* InferManager::get_core(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
auto infer_engine =
dynamic_cast<DBReloadableInferEngine<T>*>(it->second->default_engine());
if (infer_engine) {
return infer_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << model_name;
return NULL;
}
// Versioned inference interface
int InferManager::infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size, version);
}
template <typename T>
T* InferManager::get_core(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
return it->second->get_core<T>(version);
}
int InferManager::query_version(const std::string& model, uint64_t& version) {
auto it = _map.find(model);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model;
return -1;
}
auto infer_engine = it->second->default_engine();
if (!infer_engine) {
LOG(WARNING) << "Cannot get default engine for model:" << model;
return -1;
}
version = infer_engine->version();
LOG(INFO) << "Succ get version: " << version << " for model: " << model;
return 0;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
......@@ -13,14 +13,14 @@
// limitations under the License.
#pragma once
#include <pthread.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <numeric>
#include <string>
#include <utility>
#include <vector>
#include <numeric>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/factory.h"
......@@ -84,10 +84,10 @@ class InferEngine {
virtual int thrd_clear_impl() = 0;
virtual int proc_finalize_impl() = 0;
virtual int infer_impl(const void* in,
void* out,
uint32_t batch_size = -1) = 0;
void* out,
uint32_t batch_size = -1) = 0;
virtual int task_infer_impl(const BatchTensor& in,
BatchTensor& out) = 0; // NOLINT
BatchTensor& out) = 0; // NOLINT
// end: framework inner call
};
......@@ -96,193 +96,77 @@ class ReloadableInferEngine : public InferEngine {
public:
virtual ~ReloadableInferEngine() {}
union last_check_status {
time_t last_timestamp;
uint64_t last_md5sum;
uint64_t last_revision;
// Reloadable record
union ReloadableRecord {
time_t timestamp;
uint64_t md5sum;
uint64_t revision;
};
virtual int load(const configure::EngineDesc& conf) = 0;
typedef im::bsf::Task<Tensor, Tensor> TaskT;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
_reload_mode_tag = conf.reloadable_type();
_model_data_path = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
_conf = conf;
if (!check_need_reload() || load(conf) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_data_path;
return -1;
}
if (parse_version_info(conf, version) != 0) {
LOG(ERROR) << "Failed parse version info";
return -1;
}
LOG(WARNING) << "Succ load model_data_path" << _model_data_path;
return 0;
}
int proc_initialize(const configure::EngineDesc& conf, bool version) {
if (proc_initialize_impl(conf, version) != 0) {
LOG(ERROR) << "Failed proc initialize impl";
return -1;
}
// init bsf framework
if (_infer_thread_num <= 0) {
return 0;
}
// init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn(
boost::bind(&InferEngine::thrd_clear_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size);
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(
_infer_batch_align);
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) !=
0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1;
}
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
return 0;
}
int proc_initialize_impl(const configure::EngineDesc& conf, bool version);
int infer(const void* in, void* out, uint32_t batch_size = -1) {
if (_infer_thread_num <= 0) {
return infer_impl(in, out, batch_size);
}
int proc_initialize(const configure::EngineDesc& conf, bool version);
im::bsf::TaskManager<Tensor, Tensor> task_manager;
task_manager.schedule(*(reinterpret_cast<const BatchTensor*>(in)),
*(reinterpret_cast<BatchTensor*>(out)));
task_manager.wait();
return 0;
}
int infer(const void* in, void* out, uint32_t batch_size = -1);
int thrd_initialize() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_initialize_impl();
}
int thrd_initialize();
int thrd_clear() {
if (_infer_thread_num > 0) {
return 0;
}
int thrd_clear();
return thrd_clear_impl();
}
int proc_finalize();
int proc_finalize() {
if (proc_finalize_impl() != 0) {
LOG(ERROR) << "Failed proc finalize impl";
return -1;
}
if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop();
}
return 0;
}
int reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_data_path << "].";
return load(_conf);
}
return 0;
}
int reload();
uint64_t version() const { return _version; }
uint32_t thread_num() const { return _infer_thread_num; }
private:
int parse_version_info(const configure::EngineDesc& config, bool version) {
_version = uint64_t(-1);
return 0;
}
bool check_need_reload() {
if (_reload_mode_tag == "timestamp_ne") {
return check_timestamp_ne();
} else if (_reload_mode_tag == "timestamp_gt") {
return check_timestamp_gt();
} else if (_reload_mode_tag == "md5sum") {
return check_md5sum();
} else if (_reload_mode_tag == "revision") {
return check_revision();
} else if (_reload_mode_tag == "none") {
return false;
} else {
LOG(ERROR) << "Not support check type: " << _reload_mode_tag;
return false;
}
}
int parse_version_info(const configure::EngineDesc& config, bool version);
bool check_timestamp_ne() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
bool check_need_reload();
if ((st.st_mode & S_IFREG) && st.st_mtime != _last_status.last_timestamp) {
_last_status.last_timestamp = st.st_mtime;
return true;
}
bool check_timestamp_ne();
return false;
}
bool check_timestamp_gt() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime > _last_status.last_timestamp) {
_last_status.last_timestamp = st.st_mtime;
return true;
}
return false;
}
bool check_timestamp_gt();
bool check_md5sum() { return false; }
bool check_revision() { return false; }
protected:
std::string _model_data_path;
// Model directory
std::string _model_dir;
// The description of inference engine
configure::EngineDesc _conf;
private:
// Tag file of reloadable model
std::string _reload_tag_file;
std::string _reload_mode_tag;
last_check_status _last_status;
// Type of reload, e.g. timestamp_ne, timestamp_gt, md5sum, reversion
std::string _reload_type;
// Record the last loading infermation
ReloadableRecord _last_record;
// Number of inference threads
uint32_t _infer_thread_num;
// Size of inference batch
uint32_t _infer_batch_size;
// Need to align batch_size in inferring
bool _infer_batch_align;
// model version
uint64_t _version;
};
// Lock free switching two models
template <typename EngineCore>
struct ModelData {
ModelData() : current_idx(1) {
......@@ -323,12 +207,10 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
}
LOG(WARNING) << "Succ load engine, path: " << conf.model_dir();
return 0;
}
int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) {
int load_data(ModelData<EngineCore>* md, const configure::EngineDesc& conf) {
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
......@@ -354,11 +236,11 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _conf) != 0) {
LOG(ERROR) << "Failed create thread data from "
<< _conf.model_dir();
LOG(ERROR) << "Failed create thread data from " << _conf.model_dir();
return -1;
}
LOG(ERROR) << "THREAD_SETSPECIFIC _skey = md";
THREAD_SETSPECIFIC(_skey, md);
im::bsf::AutoMutex lock(_mutex);
_reload_vec.push_back(md);
......@@ -396,8 +278,6 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
THREAD_KEY_T _skey;
THREAD_MUTEX_T _mutex;
std::vector<ModelData<EngineCore>*> _reload_vec;
private:
};
// 多个EngineCore共用同一份模型数据
......@@ -442,7 +322,6 @@ class CloneDBReloadableInferEngine
}
LOG(WARNING) << "Succ load clone model, path[" << conf.model_dir() << "]";
return 0;
}
......@@ -516,10 +395,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
// Set the lod and shape information of InputData first.
// Then copy data from cpu to the core.
const TensorVector* tensorVector_in_pointer =
reinterpret_cast<const TensorVector*>(in);
for (int i=0; i < tensorVector_in_pointer->size(); ++i) {
reinterpret_cast<const TensorVector*>(in);
for (int i = 0; i < tensorVector_in_pointer->size(); ++i) {
auto lod_tensor_in =
core->GetInputHandle((*tensorVector_in_pointer)[i].name);
core->GetInputHandle((*tensorVector_in_pointer)[i].name);
lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod);
lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape);
void* origin_data = (*tensorVector_in_pointer)[i].data.data();
......@@ -531,11 +410,11 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
float* data = static_cast<float*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::INT64) {
paddle::PaddleDType::INT64) {
int64_t* data = static_cast<int64_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::INT32) {
paddle::PaddleDType::INT32) {
int32_t* data = static_cast<int32_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}
......@@ -543,8 +422,8 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
// After the input data is passed in,
// call 'core->Run()' perform the prediction process.
if (!core->Run()) {
LOG(ERROR) << "Failed run fluid family core";
return -1;
LOG(ERROR) << "Failed run fluid family core";
return -1;
}
// In order to get the results,
// first, call the 'core->GetOutputNames()' to get the name of output
......@@ -558,7 +437,7 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
char* databuf_char = NULL;
size_t databuf_size = 0;
TensorVector* tensorVector_out_pointer =
reinterpret_cast<TensorVector*>(out);
reinterpret_cast<TensorVector*>(out);
if (!tensorVector_out_pointer) {
LOG(ERROR) << "tensorVector_out_pointer is nullptr,error";
return -1;
......@@ -567,38 +446,38 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
// then copy data to cpu from the core.
// The pointer type of data_out must be one of
// float *,int64_t*,int32_t* instead void*.
for (int i=0; i < outnames.size(); ++i) {
for (int i = 0; i < outnames.size(); ++i) {
auto lod_tensor_out = core->GetOutputHandle(outnames[i]);
output_shape = lod_tensor_out->shape();
out_num = std::accumulate(
output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
dataType = lod_tensor_out->type();
if (dataType == paddle::PaddleDType::FLOAT32) {
databuf_size = out_num*sizeof(float);
databuf_size = out_num * sizeof(float);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
float* data_out = reinterpret_cast<float*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
} else if (dataType == paddle::PaddleDType::INT64) {
databuf_size = out_num*sizeof(int64_t);
databuf_size = out_num * sizeof(int64_t);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
int64_t* data_out = reinterpret_cast<int64_t*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
} else if (dataType == paddle::PaddleDType::INT32) {
databuf_size = out_num*sizeof(int32_t);
databuf_size = out_num * sizeof(int32_t);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
int32_t* data_out = reinterpret_cast<int32_t*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
......@@ -614,7 +493,7 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
tensor_out.dtype = paddle::PaddleDType(dataType);
tensor_out.shape.assign(output_shape.begin(), output_shape.end());
std::vector<std::vector<size_t>> out_lod = lod_tensor_out->lod();
for (int li=0; li < out_lod.size(); ++li) {
for (int li = 0; li < out_lod.size(); ++li) {
std::vector<size_t> lod_element;
lod_element.assign(out_lod[li].begin(), out_lod[li].end());
tensor_out.lod.push_back(lod_element);
......@@ -638,187 +517,49 @@ class VersionedInferEngine : public InferEngine {
VersionedInferEngine() { _versions.clear(); }
~VersionedInferEngine() {}
int proc_initialize(const configure::EngineDesc& conf) {
if (proc_initialize(conf, false) != 0) {
LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str();
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str();
return 0;
}
int proc_initialize(const configure::EngineDesc& conf);
int proc_initialize(const configure::EngineDesc& conf, bool version) {
std::string engine_type = conf.type();
InferEngine* engine =
StaticInferFactory::instance().generate_object(engine_type);
if (!engine) {
LOG(ERROR) << "Failed generate engine with type:" << engine_type;
return -1;
}
#ifndef BCLOUD
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
int tmp = FLAGS_logtostderr;
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
FLAGS_logtostderr = tmp;
#else
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
#endif
auto r = _versions.insert(std::make_pair(engine->version(), engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine->version()
<< ", type: " << engine_type;
return -1;
}
LOG(WARNING) << "Succ proc initialize version engine: "
<< engine->version();
return 0;
}
int proc_initialize(const configure::EngineDesc& conf, bool version);
int proc_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize version engine: " << iter->first;
}
LOG(WARNING) << "Succ proc finalize version engine: " << iter->first;
}
return 0;
}
int proc_finalize();
int thrd_initialize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first;
}
return 0;
}
int thrd_initialize();
int thrd_clear() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear version engine: " << iter->first;
return -1;
}
}
return 0;
}
int thrd_clear();
int thrd_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first;
}
return 0;
}
int thrd_finalize();
int reload() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->reload() != 0) {
LOG(ERROR) << "Failed reload version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ reload version engine: " << iter->first;
}
return 0;
}
int reload();
uint64_t version() const {
InferEngine* engine = default_engine();
if (engine) {
return engine->version();
} else {
return uint64_t(-1);
}
}
uint64_t version() const;
// inference interface
InferEngine* default_engine() const {
if (_versions.size() != 1) {
LOG(ERROR) << "Ambiguous default engine version:" << _versions.size();
return NULL;
}
InferEngine* default_engine() const;
return _versions.begin()->second;
}
int infer(const void* in, void* out, uint32_t batch_size) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return -1;
}
return engine->infer(in, out, batch_size);
}
int infer(const void* in, void* out, uint32_t batch_size);
template <typename T>
T* get_core() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get core";
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(engine);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core";
return NULL;
}
T* get_core();
// versioned inference interface
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return -1;
}
return iter->second->infer(in, out, batch_size);
}
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version);
template <typename T>
T* get_core(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return NULL;
}
T* get_core(uint64_t version);
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(iter->second);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << version;
return NULL;
}
int proc_initialize_impl(const configure::EngineDesc& conf, bool);
// --
int proc_initialize_impl(const configure::EngineDesc& conf, bool) {
return -1;
}
int thrd_initialize_impl() { return -1; }
int thrd_finalize_impl() { return -1; }
int thrd_clear_impl() { return -1; }
int proc_finalize_impl() { return -1; }
int infer_impl(const void* in, void* out, uint32_t batch_size = -1) {
return -1;
}
int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT
return -1;
} // NOLINT
int thrd_initialize_impl();
int thrd_finalize_impl();
int thrd_clear_impl();
int proc_finalize_impl();
int infer_impl(const void* in, void* out, uint32_t batch_size = -1);
int task_infer_impl(const BatchTensor& in, BatchTensor& out);
private:
boost::unordered_map<uint64_t, InferEngine*> _versions;
......@@ -831,158 +572,38 @@ class InferManager {
return ins;
}
int proc_initialize(const char* path, const char* file) {
ModelToolkitConf model_toolkit_conf;
if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) {
LOG(ERROR) << "failed load infer config, path: " << path << "/" << file;
return -1;
}
size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) {
LOG(ERROR) << "Failed generate versioned engine: " << engine_name;
return -1;
}
if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) {
LOG(ERROR) << "Failed initialize version engine, name:" << engine_name;
return -1;
}
auto r = _map.insert(std::make_pair(engine_name, engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine_name;
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << engine_name;
}
return 0;
}
int proc_initialize(const char* path, const char* file);
int thrd_initialize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first;
}
return 0;
}
int thrd_initialize();
int thrd_clear() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear engine, name: " << it->first;
return -1;
}
}
return 0;
}
int thrd_clear();
int reload() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->reload() != 0) {
LOG(ERROR) << "Failed reload engine, name: " << it->first;
return -1;
}
}
return 0;
}
int reload();
int thrd_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first;
}
return 0;
}
int thrd_finalize();
int proc_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ proc finalize engine, name: " << it->first;
}
_map.clear();
return 0;
}
int proc_finalize();
// Inference interface
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size = -1) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size);
}
uint32_t batch_size = -1);
template <typename T>
T* get_core(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
auto infer_engine =
dynamic_cast<DBReloadableInferEngine<T>*>(it->second->default_engine());
if (infer_engine) {
return infer_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << model_name;
return NULL;
}
T* get_core(const char* model_name);
// Versioned inference interface
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size, version);
}
uint64_t version);
template <typename T>
T* get_core(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
return it->second->get_core<T>(version);
}
T* get_core(const char* model_name, uint64_t version);
int query_version(const std::string& model, uint64_t& version) { // NOLINT
auto it = _map.find(model);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model;
return -1;
}
auto infer_engine = it->second->default_engine();
if (!infer_engine) {
LOG(WARNING) << "Cannot get default engine for model:" << model;
return -1;
}
version = infer_engine->version();
LOG(INFO) << "Succ get version: " << version << " for model: " << model;
return 0;
}
int query_version(const std::string& model, uint64_t& version);
private:
boost::unordered_map<std::string, VersionedInferEngine*> _map;
......
......@@ -27,22 +27,9 @@ namespace predictor {
using configure::ResourceConf;
using configure::GeneralModelConfig;
using rec::mcube::CubeAPI;
// __thread bool p_thread_initialized = false;
static void dynamic_resource_deleter(void* d) {
#if 1
LOG(INFO) << "dynamic_resource_delete on " << bthread_self();
#endif
delete static_cast<DynamicResource*>(d);
}
DynamicResource::DynamicResource() {}
DynamicResource::~DynamicResource() {}
int DynamicResource::initialize() { return 0; }
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > Resource::get_general_model_config() {
std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
Resource::get_general_model_config() {
return _configs;
}
......@@ -96,8 +83,6 @@ void Resource::print_general_model_config(
}
}
int DynamicResource::clear() { return 0; }
int Resource::initialize(const std::string& path, const std::string& file) {
ResourceConf resource_conf;
if (configure::read_proto_conf(path, file, &resource_conf) != 0) {
......@@ -150,29 +135,25 @@ int Resource::initialize(const std::string& path, const std::string& file) {
if (FLAGS_enable_model_toolkit) {
size_t model_toolkit_num = resource_conf.model_toolkit_path_size();
for (size_t mi=0; mi < model_toolkit_num; ++mi) {
for (size_t mi = 0; mi < model_toolkit_num; ++mi) {
std::string model_toolkit_path = resource_conf.model_toolkit_path(mi);
std::string model_toolkit_file = resource_conf.model_toolkit_file(mi);
if (InferManager::instance().proc_initialize(
model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) {
LOG(ERROR) << "failed proc initialize modeltoolkit, config: "
<< model_toolkit_path << "/" << model_toolkit_file;
<< model_toolkit_path << "/" << model_toolkit_file;
return -1;
}
if (KVManager::instance().proc_initialize(
model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) {
LOG(ERROR) << "Failed proc initialize kvmanager, config: "
<< model_toolkit_path << "/" << model_toolkit_file;
<< model_toolkit_path << "/" << model_toolkit_file;
}
}
}
if (THREAD_KEY_CREATE(&_tls_bspec_key, dynamic_resource_deleter) != 0) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
return -1;
}
// init rocksDB or cube instance
if (resource_conf.has_cube_config_file() &&
resource_conf.has_cube_config_path()) {
......@@ -225,18 +206,16 @@ int Resource::general_model_initialize(const std::string& path,
return -1;
}
size_t general_model_num = resource_conf.general_model_path_size();
for (size_t gi=0; gi < general_model_num; ++gi) {
for (size_t gi = 0; gi < general_model_num; ++gi) {
std::string general_model_path = resource_conf.general_model_path(gi);
std::string general_model_file = resource_conf.general_model_file(gi);
GeneralModelConfig model_config;
if (configure::read_proto_conf(general_model_path.c_str(),
general_model_file.c_str(),
&model_config) != 0) {
LOG(ERROR) << "Failed initialize model config from: " << general_model_path
<< "/" << general_model_file;
general_model_file.c_str(),
&model_config) != 0) {
LOG(ERROR) << "Failed initialize model config from: "
<< general_model_path << "/" << general_model_file;
return -1;
}
auto _config = std::make_shared<PaddleGeneralModelConfig>();
......@@ -249,7 +228,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_is_lod_feed.resize(feed_var_num);
_config->_capacity.resize(feed_var_num);
_config->_feed_shape.resize(feed_var_num);
for (int i=0; i < feed_var_num; ++i) {
for (int i = 0; i < feed_var_num; ++i) {
_config->_feed_name[i] = model_config.feed_var(i).name();
_config->_feed_alias_name[i] = model_config.feed_var(i).alias_name();
VLOG(2) << "feed var[" << i << "]: " << _config->_feed_name[i];
......@@ -265,7 +244,7 @@ int Resource::general_model_initialize(const std::string& path,
VLOG(2) << "var[" << i << "] is tensor";
_config->_capacity[i] = 1;
_config->_is_lod_feed[i] = false;
for (int j=0; j < model_config.feed_var(i).shape_size(); ++j) {
for (int j = 0; j < model_config.feed_var(i).shape_size(); ++j) {
int32_t dim = model_config.feed_var(i).shape(j);
VLOG(2) << "var[" << i << "].shape[" << i << "]: " << dim;
_config->_feed_shape[i].push_back(dim);
......@@ -279,7 +258,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_fetch_name.resize(fetch_var_num);
_config->_fetch_alias_name.resize(fetch_var_num);
_config->_fetch_shape.resize(fetch_var_num);
for (int i=0; i < fetch_var_num; ++i) {
for (int i = 0; i < fetch_var_num; ++i) {
_config->_fetch_name[i] = model_config.fetch_var(i).name();
_config->_fetch_alias_name[i] = model_config.fetch_var(i).alias_name();
_config->_fetch_name_to_index[_config->_fetch_name[i]] = i;
......@@ -290,7 +269,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_is_lod_fetch[i] = true;
} else {
_config->_is_lod_fetch[i] = false;
for (int j=0; j < model_config.fetch_var(i).shape_size(); ++j) {
for (int j = 0; j < model_config.fetch_var(i).shape_size(); ++j) {
int dim = model_config.fetch_var(i).shape(j);
_config->_fetch_shape[i].push_back(dim);
}
......@@ -316,36 +295,6 @@ int Resource::thread_initialize() {
return -1;
}
DynamicResource* p_dynamic_resource =
reinterpret_cast<DynamicResource*>(THREAD_GETSPECIFIC(_tls_bspec_key));
if (p_dynamic_resource == NULL) {
p_dynamic_resource = new (std::nothrow) DynamicResource;
if (p_dynamic_resource == NULL) {
LOG(ERROR) << "failed to create tls DynamicResource";
return -1;
}
if (p_dynamic_resource->initialize() != 0) {
LOG(ERROR) << "DynamicResource initialize failed.";
delete p_dynamic_resource;
p_dynamic_resource = NULL;
return -1;
}
if (THREAD_SETSPECIFIC(_tls_bspec_key, p_dynamic_resource) != 0) {
LOG(ERROR) << "unable to set tls DynamicResource";
delete p_dynamic_resource;
p_dynamic_resource = NULL;
return -1;
}
}
#if 0
LOG(INFO) << "Successfully thread initialized dynamic resource";
#else
LOG(INFO) << bthread_self()
<< ": Successfully thread initialized dynamic resource "
<< p_dynamic_resource;
#endif
return 0;
}
......@@ -362,26 +311,6 @@ int Resource::thread_clear() {
LOG(ERROR) << "Failed thrd clear infer manager";
return -1;
}
DynamicResource* p_dynamic_resource =
reinterpret_cast<DynamicResource*>(THREAD_GETSPECIFIC(_tls_bspec_key));
if (p_dynamic_resource == NULL) {
#if 0
LOG(ERROR) << "tls dynamic resource shouldn't be null after "
<< "thread_initialize";
#else
LOG(ERROR)
<< bthread_self()
<< ": tls dynamic resource shouldn't be null after thread_initialize";
#endif
return -1;
}
if (p_dynamic_resource->clear() != 0) {
LOG(ERROR) << "Failed to invoke dynamic resource clear";
return -1;
}
// ...
return 0;
}
size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; }
......
......@@ -54,15 +54,6 @@ class PaddleGeneralModelConfig {
};
class BaseRdDict;
struct DynamicResource {
DynamicResource();
~DynamicResource();
int initialize();
int clear();
};
class Resource {
public:
......@@ -94,20 +85,17 @@ class Resource {
int finalize();
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > get_general_model_config();
std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
get_general_model_config();
void print_general_model_config(
const std::shared_ptr<PaddleGeneralModelConfig>& config);
DynamicResource* get_dynamic_resource() {
return reinterpret_cast<DynamicResource*>(
THREAD_GETSPECIFIC(_tls_bspec_key));
}
size_t get_cube_quant_bits();
private:
int thread_finalize() { return 0; }
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > _configs;
std::vector<std::shared_ptr<PaddleGeneralModelConfig>> _configs;
std::string cube_config_fullpath;
int cube_quant_bits; // 0 if no empty
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册