未验证 提交 567e449d 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge branch 'develop' into low-precision-doc

......@@ -60,6 +60,13 @@ option(WITH_TRT "Compile Paddle Serving with TRT"
option(PADDLE_ON_INFERENCE "Compile for encryption" ON)
option(WITH_OPENCV "Compile Paddle Serving with OPENCV" OFF)
if(NOT DEFINED VERSION_TAG)
set(VERSION_TAG "0.0.0")
endif()
if (WITH_PYTHON)
message(STATUS "Compile Version Tag for wheel: ${VERSION_TAG}")
endif()
if (WITH_OPENCV)
SET(OPENCV_DIR "" CACHE PATH "Location of libraries")
if(NOT DEFINED OPENCV_DIR)
......
......@@ -47,17 +47,14 @@ int conf_check(const Request *req,
for (int i = 0; i < var_num; ++i) {
const Tensor &tensor = req->insts(0).tensor_array(i);
if (model_config->_feed_type[i] !=
tensor.elem_type()) {
if (model_config->_feed_type[i] != tensor.elem_type()) {
LOG(ERROR) << "feed type not match.";
return -1;
}
if (model_config->_feed_shape[i].size() ==
tensor.shape_size()) {
if (model_config->_feed_shape[i].size() == tensor.shape_size()) {
for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) {
tensor.shape(j);
if (model_config->_feed_shape[i][j] !=
tensor.shape(j)) {
if (model_config->_feed_shape[i][j] != tensor.shape(j)) {
LOG(ERROR) << "feed shape not match.";
return -1;
}
......@@ -73,6 +70,11 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() {
// read request from client
const Request *req = dynamic_cast<const Request *>(get_request_message());
if (!req) {
LOG(ERROR) << "Failed get request message";
return -1;
}
uint64_t log_id = req->log_id();
int input_var_num = 0;
std::vector<int64_t> elem_type;
......@@ -80,14 +82,18 @@ int GeneralReaderOp::inference() {
std::vector<int64_t> databuf_size;
GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &(res->tensor_vector);
res->SetLogId(log_id);
if (!res) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op tls reader object output";
LOG(ERROR) << "(logid=" << log_id << ") Failed get GeneralBlob";
return -1;
}
TensorVector *out = &(res->tensor_vector);
if (!out) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get tensor_vector of res";
return -1;
}
res->SetLogId(log_id);
Timer timeline;
int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size();
......@@ -99,7 +105,7 @@ int GeneralReaderOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
//get the first InferOP's model_config as ReaderOp's model_config by default.
// get the first InferOP's model_config as ReaderOp's model_config by default.
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config().front();
......@@ -140,10 +146,11 @@ int GeneralReaderOp::inference() {
lod_tensor.dtype = paddle::PaddleDType::INT32;
data_len = tensor.int_data_size();
} else if (elem_type[i] == P_STRING) {
//use paddle::PaddleDType::UINT8 as for String.
// use paddle::PaddleDType::UINT8 as for String.
elem_size[i] = sizeof(uint8_t);
lod_tensor.dtype = paddle::PaddleDType::UINT8;
//this is for vector<String>, cause the databuf_size != vector<String>.size()*sizeof(char);
// this is for vector<String>, cause the databuf_size !=
// vector<String>.size()*sizeof(char);
for (int idx = 0; idx < tensor.data_size(); idx++) {
data_len += tensor.data()[idx].length();
}
......@@ -157,34 +164,32 @@ int GeneralReaderOp::inference() {
for (int k = 0; k < tensor.lod_size(); ++k) {
lod_tensor.lod[0].push_back(tensor.lod(k));
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
}
for (int k = 0; k < tensor.shape_size(); ++k) {
int dim = tensor.shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim;
lod_tensor.shape.push_back(dim);
}
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
databuf_size[i] = data_len * elem_size[i];
out->at(i).data.Resize(data_len * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
if (elem_type[i] == P_INT64) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << tensor.int64_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.int64_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.int64_data().data(), databuf_size[i]);
/*
int elem_num = tensor.int64_data_size();
for (int k = 0; k < elem_num; ++k) {
......@@ -197,9 +202,9 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.float_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.float_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.float_data().data(), databuf_size[i]);
/*int elem_num = tensor.float_data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = tensor.float_data(k);
......@@ -210,9 +215,9 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.int_data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
memcpy(dst_ptr, tensor.int_data().data(),databuf_size[i]);
memcpy(dst_ptr, tensor.int_data().data(), databuf_size[i]);
/*
int elem_num = tensor.int_data_size();
for (int k = 0; k < elem_num; ++k) {
......@@ -225,7 +230,7 @@ int GeneralReaderOp::inference() {
<< "] is " << tensor.data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
return -1;
}
int elem_num = tensor.data_size();
for (int k = 0; k < elem_num; ++k) {
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/predictor/framework/infer.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int ReloadableInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
_reload_type = conf.reloadable_type();
_model_dir = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
_conf = conf;
if (!check_need_reload() || load(conf) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_dir;
return -1;
}
if (parse_version_info(conf, version) != 0) {
LOG(ERROR) << "Failed parse version info";
return -1;
}
LOG(WARNING) << "Succ load model:" << _model_dir;
return 0;
}
int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
bool version) {
if (proc_initialize_impl(conf, version) != 0) {
LOG(ERROR) << "Failed proc initialize impl";
return -1;
}
// init bsf framework
if (_infer_thread_num <= 0) {
return 0;
}
// init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn(
boost::bind(&InferEngine::thrd_clear_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size);
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(_infer_batch_align);
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) != 0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1;
}
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
return 0;
}
int ReloadableInferEngine::infer(const void* in,
void* out,
uint32_t batch_size) {
if (_infer_thread_num <= 0) {
return infer_impl(in, out, batch_size);
}
im::bsf::TaskManager<Tensor, Tensor> task_manager;
task_manager.schedule(*(reinterpret_cast<const BatchTensor*>(in)),
*(reinterpret_cast<BatchTensor*>(out)));
task_manager.wait();
return 0;
}
int ReloadableInferEngine::thrd_initialize() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_initialize_impl();
}
int ReloadableInferEngine::thrd_clear() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_clear_impl();
}
int ReloadableInferEngine::proc_finalize() {
if (proc_finalize_impl() != 0) {
LOG(ERROR) << "Failed proc finalize impl";
return -1;
}
if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop();
}
return 0;
}
int ReloadableInferEngine::reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_dir << "].";
return load(_conf);
}
return 0;
}
int ReloadableInferEngine::parse_version_info(
const configure::EngineDesc& config, bool version) {
_version = uint64_t(-1);
return 0;
}
bool ReloadableInferEngine::check_need_reload() {
if (_reload_type == "timestamp_ne") {
return check_timestamp_ne();
} else if (_reload_type == "timestamp_gt") {
return check_timestamp_gt();
} else if (_reload_type == "md5sum") {
return check_md5sum();
} else if (_reload_type == "revision") {
return check_revision();
} else if (_reload_type == "none") {
return false;
}
LOG(ERROR) << "Not support reload type: " << _reload_type;
return false;
}
bool ReloadableInferEngine::check_timestamp_ne() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime != _last_record.timestamp) {
_last_record.timestamp = st.st_mtime;
return true;
}
return false;
}
bool ReloadableInferEngine::check_timestamp_gt() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime > _last_record.timestamp) {
_last_record.timestamp = st.st_mtime;
return true;
}
return false;
}
int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf) {
if (proc_initialize(conf, false) != 0) {
LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str();
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str();
return 0;
}
int VersionedInferEngine::proc_initialize(const configure::EngineDesc& conf,
bool version) {
std::string engine_type = conf.type();
InferEngine* engine =
StaticInferFactory::instance().generate_object(engine_type);
if (!engine) {
LOG(ERROR) << "Failed generate engine with type:" << engine_type;
return -1;
}
#ifndef BCLOUD
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
int tmp = FLAGS_logtostderr;
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
FLAGS_logtostderr = tmp;
#else
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
#endif
auto r = _versions.insert(std::make_pair(engine->version(), engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine->version()
<< ", type: " << engine_type;
return -1;
}
LOG(WARNING) << "Succ proc initialize version engine: " << engine->version();
return 0;
}
int VersionedInferEngine::proc_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize version engine: " << iter->first;
}
LOG(WARNING) << "Succ proc finalize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::thrd_initialize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::thrd_clear() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear version engine: " << iter->first;
return -1;
}
}
return 0;
}
int VersionedInferEngine::thrd_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first;
}
return 0;
}
int VersionedInferEngine::reload() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->reload() != 0) {
LOG(ERROR) << "Failed reload version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ reload version engine: " << iter->first;
}
return 0;
}
uint64_t VersionedInferEngine::version() const {
InferEngine* engine = default_engine();
if (engine) {
return engine->version();
} else {
return uint64_t(-1);
}
}
// inference interface
InferEngine* VersionedInferEngine::default_engine() const {
if (_versions.size() != 1) {
LOG(ERROR) << "Ambiguous default engine version:" << _versions.size();
return NULL;
}
return _versions.begin()->second;
}
int VersionedInferEngine::infer(const void* in,
void* out,
uint32_t batch_size) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return -1;
}
return engine->infer(in, out, batch_size);
}
// versioned inference interface
int VersionedInferEngine::infer(const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return -1;
}
return iter->second->infer(in, out, batch_size);
}
template <typename T>
T* VersionedInferEngine::get_core() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get core";
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(engine);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core";
return NULL;
}
template <typename T>
T* VersionedInferEngine::get_core(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(iter->second);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << version;
return NULL;
}
int VersionedInferEngine::proc_initialize_impl(
const configure::EngineDesc& conf, bool) {
return -1;
}
int VersionedInferEngine::thrd_initialize_impl() { return -1; }
int VersionedInferEngine::thrd_finalize_impl() { return -1; }
int VersionedInferEngine::thrd_clear_impl() { return -1; }
int VersionedInferEngine::proc_finalize_impl() { return -1; }
int VersionedInferEngine::infer_impl(const void* in,
void* out,
uint32_t batch_size) {
return -1;
}
int VersionedInferEngine::task_infer_impl(const BatchTensor& in,
BatchTensor& out) { // NOLINT
return -1;
}
int InferManager::proc_initialize(const char* path, const char* file) {
ModelToolkitConf model_toolkit_conf;
if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) {
LOG(ERROR) << "failed load infer config, path: " << path << "/" << file;
return -1;
}
size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) {
LOG(ERROR) << "Failed generate versioned engine: " << engine_name;
return -1;
}
if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) {
LOG(ERROR) << "Failed initialize version engine, name:" << engine_name;
return -1;
}
auto r = _map.insert(std::make_pair(engine_name, engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine_name;
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << engine_name;
}
return 0;
}
int InferManager::thrd_initialize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first;
}
return 0;
}
int InferManager::thrd_clear() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear engine, name: " << it->first;
return -1;
}
}
return 0;
}
int InferManager::reload() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->reload() != 0) {
LOG(ERROR) << "Failed reload engine, name: " << it->first;
return -1;
}
}
return 0;
}
int InferManager::thrd_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first;
}
return 0;
}
int InferManager::proc_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ proc finalize engine, name: " << it->first;
}
_map.clear();
return 0;
}
// Inference interface
int InferManager::infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size);
}
template <typename T>
T* InferManager::get_core(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
auto infer_engine =
dynamic_cast<DBReloadableInferEngine<T>*>(it->second->default_engine());
if (infer_engine) {
return infer_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << model_name;
return NULL;
}
// Versioned inference interface
int InferManager::infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size, version);
}
template <typename T>
T* InferManager::get_core(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
return it->second->get_core<T>(version);
}
int InferManager::query_version(const std::string& model, uint64_t& version) {
auto it = _map.find(model);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model;
return -1;
}
auto infer_engine = it->second->default_engine();
if (!infer_engine) {
LOG(WARNING) << "Cannot get default engine for model:" << model;
return -1;
}
version = infer_engine->version();
LOG(INFO) << "Succ get version: " << version << " for model: " << model;
return 0;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
......@@ -13,14 +13,14 @@
// limitations under the License.
#pragma once
#include <pthread.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <numeric>
#include <string>
#include <utility>
#include <vector>
#include <numeric>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/factory.h"
......@@ -68,7 +68,9 @@ class InferEngine {
virtual int thrd_initialize() { return thrd_initialize_impl(); }
virtual int thrd_clear() { return thrd_clear_impl(); }
virtual int thrd_finalize() { return thrd_finalize_impl(); }
virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl(in, out, batch_size); }
virtual int infer(const void* in, void* out, uint32_t batch_size = -1) {
return infer_impl(in, out, batch_size);
}
virtual int reload() = 0;
......@@ -82,10 +84,10 @@ class InferEngine {
virtual int thrd_clear_impl() = 0;
virtual int proc_finalize_impl() = 0;
virtual int infer_impl(const void* in,
void* out,
uint32_t batch_size = -1) = 0;
void* out,
uint32_t batch_size = -1) = 0;
virtual int task_infer_impl(const BatchTensor& in,
BatchTensor& out) = 0; // NOLINT
BatchTensor& out) = 0; // NOLINT
// end: framework inner call
};
......@@ -94,194 +96,77 @@ class ReloadableInferEngine : public InferEngine {
public:
virtual ~ReloadableInferEngine() {}
union last_check_status {
time_t last_timestamp;
uint64_t last_md5sum;
uint64_t last_revision;
// Reloadable record
union ReloadableRecord {
time_t timestamp;
uint64_t md5sum;
uint64_t revision;
};
virtual int load(const configure::EngineDesc& conf) = 0;
typedef im::bsf::Task<Tensor, Tensor> TaskT;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version) {
_reload_tag_file = conf.reloadable_meta();
_reload_mode_tag = conf.reloadable_type();
_model_data_path = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
_conf = conf;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version);
if (!check_need_reload() || load(conf) != 0) {
LOG(ERROR) << "Failed load model_data_path" << _model_data_path;
return -1;
}
if (parse_version_info(conf, version) != 0) {
LOG(ERROR) << "Failed parse version info";
return -1;
}
LOG(WARNING) << "Succ load model_data_path" << _model_data_path;
return 0;
}
int proc_initialize(const configure::EngineDesc& conf, bool version);
int proc_initialize(const configure::EngineDesc& conf, bool version) {
if (proc_initialize_impl(conf, version) != 0) {
LOG(ERROR) << "Failed proc initialize impl";
return -1;
}
int infer(const void* in, void* out, uint32_t batch_size = -1);
// init bsf framework
if (_infer_thread_num <= 0) {
return 0;
}
int thrd_initialize();
// init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn(
boost::bind(&InferEngine::thrd_clear_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn(
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size);
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(
_infer_batch_align);
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) !=
0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1;
}
int thrd_clear();
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
return 0;
}
int proc_finalize();
int infer(const void* in, void* out, uint32_t batch_size = -1) {
if (_infer_thread_num <= 0) {
return infer_impl(in, out, batch_size);
}
im::bsf::TaskManager<Tensor, Tensor> task_manager;
task_manager.schedule(*(reinterpret_cast<const BatchTensor*>(in)),
*(reinterpret_cast<BatchTensor*>(out)));
task_manager.wait();
return 0;
}
int thrd_initialize() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_initialize_impl();
}
int thrd_clear() {
if (_infer_thread_num > 0) {
return 0;
}
return thrd_clear_impl();
}
int proc_finalize() {
if (proc_finalize_impl() != 0) {
LOG(ERROR) << "Failed proc finalize impl";
return -1;
}
if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop();
}
return 0;
}
int reload() {
if (check_need_reload()) {
LOG(WARNING) << "begin reload model[" << _model_data_path << "].";
return load(_conf);
}
return 0;
}
int reload();
uint64_t version() const { return _version; }
uint32_t thread_num() const { return _infer_thread_num; }
private:
int parse_version_info(const configure::EngineDesc& config, bool version) {
_version = uint64_t(-1);
return 0;
}
bool check_need_reload() {
if (_reload_mode_tag == "timestamp_ne") {
return check_timestamp_ne();
} else if (_reload_mode_tag == "timestamp_gt") {
return check_timestamp_gt();
} else if (_reload_mode_tag == "md5sum") {
return check_md5sum();
} else if (_reload_mode_tag == "revision") {
return check_revision();
} else if (_reload_mode_tag == "none") {
return false;
} else {
LOG(ERROR) << "Not support check type: " << _reload_mode_tag;
return false;
}
}
int parse_version_info(const configure::EngineDesc& config, bool version);
bool check_timestamp_ne() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
bool check_need_reload();
if ((st.st_mode & S_IFREG) && st.st_mtime != _last_status.last_timestamp) {
_last_status.last_timestamp = st.st_mtime;
return true;
}
bool check_timestamp_ne();
return false;
}
bool check_timestamp_gt() {
struct stat st;
if (stat(_reload_tag_file.c_str(), &st) != 0) {
LOG(ERROR) << "Failed stat config file:" << _reload_tag_file;
return false;
}
if ((st.st_mode & S_IFREG) && st.st_mtime > _last_status.last_timestamp) {
_last_status.last_timestamp = st.st_mtime;
return true;
}
return false;
}
bool check_timestamp_gt();
bool check_md5sum() { return false; }
bool check_revision() { return false; }
protected:
std::string _model_data_path;
// Model directory
std::string _model_dir;
// The description of inference engine
configure::EngineDesc _conf;
private:
// Tag file of reloadable model
std::string _reload_tag_file;
std::string _reload_mode_tag;
last_check_status _last_status;
// Type of reload, e.g. timestamp_ne, timestamp_gt, md5sum, reversion
std::string _reload_type;
// Record the last loading infermation
ReloadableRecord _last_record;
// Number of inference threads
uint32_t _infer_thread_num;
// Size of inference batch
uint32_t _infer_batch_size;
// Need to align batch_size in inferring
bool _infer_batch_align;
// model version
uint64_t _version;
};
// Lock free switching two models
template <typename EngineCore>
struct ModelData {
ModelData() : current_idx(1) {
......@@ -322,12 +207,10 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
}
LOG(WARNING) << "Succ load engine, path: " << conf.model_dir();
return 0;
}
int load_data(ModelData<EngineCore>* md,
const configure::EngineDesc& conf) {
int load_data(ModelData<EngineCore>* md, const configure::EngineDesc& conf) {
uint32_t next_idx = (md->current_idx + 1) % 2;
if (md->cores[next_idx]) {
delete md->cores[next_idx];
......@@ -335,7 +218,7 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
md->cores[next_idx] = new (std::nothrow) EngineCore;
//params.dump();
// params.dump();
if (!md->cores[next_idx] || md->cores[next_idx]->create(conf) != 0) {
LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
return -1;
......@@ -353,11 +236,11 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _conf) != 0) {
LOG(ERROR) << "Failed create thread data from "
<< _conf.model_dir();
LOG(ERROR) << "Failed create thread data from " << _conf.model_dir();
return -1;
}
LOG(ERROR) << "THREAD_SETSPECIFIC _skey = md";
THREAD_SETSPECIFIC(_skey, md);
im::bsf::AutoMutex lock(_mutex);
_reload_vec.push_back(md);
......@@ -395,8 +278,6 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
THREAD_KEY_T _skey;
THREAD_MUTEX_T _mutex;
std::vector<ModelData<EngineCore>*> _reload_vec;
private:
};
// 多个EngineCore共用同一份模型数据
......@@ -441,7 +322,6 @@ class CloneDBReloadableInferEngine
}
LOG(WARNING) << "Succ load clone model, path[" << conf.model_dir() << "]";
return 0;
}
......@@ -491,112 +371,129 @@ class CloneDBReloadableInferEngine
_pd; // 进程级EngineCore,多个线程级EngineCore共用该对象的模型数据
};
template <typename PaddleInferenceCore>
template <typename EngineCore>
#ifdef WITH_TRT
class FluidInferEngine : public DBReloadableInferEngine<PaddleInferenceCore> {
class FluidInferEngine : public DBReloadableInferEngine<EngineCore> {
#else
class FluidInferEngine : public CloneDBReloadableInferEngine<PaddleInferenceCore> {
class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
#endif
public: // NOLINT
FluidInferEngine() {}
~FluidInferEngine() {}
typedef std::vector<paddle::PaddleTensor> TensorVector;
int infer_impl(const void* in, void* out, uint32_t batch_size = -1) {
//First of all, get the real core acording to the template parameter 'PaddleInferenceCore'.
PaddleInferenceCore* core =DBReloadableInferEngine<PaddleInferenceCore>::get_core();
// First of all, get the real core acording to the
// Template parameter <EngineCore>.
EngineCore* core = DBReloadableInferEngine<EngineCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in infer_impl()";
return -1;
}
//We use the for loop to process the input data.
//Inside each for loop, use the in[i]->name as inputName and call 'core->GetInputHandle(inputName)' to get the pointer of InputData.
//Set the lod and shape information of InputData first. then copy data from cpu to the core.
const TensorVector* tensorVector_in_pointer = reinterpret_cast<const TensorVector*>(in);
for (int i=0; i < tensorVector_in_pointer->size(); ++i) {
auto lod_tensor_in = core->GetInputHandle((*tensorVector_in_pointer)[i].name);
// We use the for loop to process the input data.
// Inside each for loop, use the in[i]->name as inputName and call
// 'core->GetInputHandle(inputName)' to get the pointer of InputData.
// Set the lod and shape information of InputData first.
// Then copy data from cpu to the core.
const TensorVector* tensorVector_in_pointer =
reinterpret_cast<const TensorVector*>(in);
for (int i = 0; i < tensorVector_in_pointer->size(); ++i) {
auto lod_tensor_in =
core->GetInputHandle((*tensorVector_in_pointer)[i].name);
lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod);
lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape);
void* origin_data = (*tensorVector_in_pointer)[i].data.data();
//Because the core needs to determine the size of memory space according to the data type passed in.
//The pointer type of data must be one of float *,int64_t*,int32_t* instead void*.
// Because the core needs to determine the size of memory space
// according to the data type passed in.
// The pointer type of data must be one of
// float *,int64_t*,int32_t* instead void*.
if ((*tensorVector_in_pointer)[i].dtype == paddle::PaddleDType::FLOAT32) {
float* data = static_cast<float*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}else if ((*tensorVector_in_pointer)[i].dtype == paddle::PaddleDType::INT64) {
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::INT64) {
int64_t* data = static_cast<int64_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}else if ((*tensorVector_in_pointer)[i].dtype == paddle::PaddleDType::INT32) {
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::INT32) {
int32_t* data = static_cast<int32_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}
}
//After the input data is passed in, call 'core->Run()' perform the prediction process.
// After the input data is passed in,
// call 'core->Run()' perform the prediction process.
if (!core->Run()) {
LOG(ERROR) << "Failed run fluid family core";
return -1;
LOG(ERROR) << "Failed run fluid family core";
return -1;
}
//In order to get the results, first, call the 'core->GetOutputNames()' to get the name of output(which is a dict like {OutputName:pointer of OutputValue}).
//Then, use for-loop to get OutputValue by calling 'core->GetOutputHandle'.
// In order to get the results,
// first, call the 'core->GetOutputNames()' to get the name of output
// (which is a dict like {OutputName:pointer of OutputValue}).
// Then, use for-loop to get OutputValue by calling 'core->GetOutputHandle'.
std::vector<std::string> outnames = core->GetOutputNames();
std::vector<int> output_shape;
int out_num =0;
int dataType =0;
int out_num = 0;
int dataType = 0;
void* databuf_data = NULL;
char* databuf_char = NULL;
size_t databuf_size = 0;
TensorVector* tensorVector_out_pointer = reinterpret_cast<TensorVector*>(out);
TensorVector* tensorVector_out_pointer =
reinterpret_cast<TensorVector*>(out);
if (!tensorVector_out_pointer) {
LOG(ERROR) << "tensorVector_out_pointer is nullptr,error";
return -1;
}
//Get the type and shape information of OutputData first. then copy data to cpu from the core.
//The pointer type of data_out must be one of float *,int64_t*,int32_t* instead void*.
for (int i=0; i < outnames.size(); ++i) {
// Get the type and shape information of OutputData first.
// then copy data to cpu from the core.
// The pointer type of data_out must be one of
// float *,int64_t*,int32_t* instead void*.
for (int i = 0; i < outnames.size(); ++i) {
auto lod_tensor_out = core->GetOutputHandle(outnames[i]);
output_shape = lod_tensor_out->shape();
out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
out_num = std::accumulate(
output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
dataType = lod_tensor_out->type();
if (dataType == paddle::PaddleDType::FLOAT32) {
databuf_size = out_num*sizeof(float);
databuf_size = out_num * sizeof(float);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
float* data_out = reinterpret_cast<float*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}else if (dataType == paddle::PaddleDType::INT64) {
databuf_size = out_num*sizeof(int64_t);
} else if (dataType == paddle::PaddleDType::INT64) {
databuf_size = out_num * sizeof(int64_t);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
int64_t* data_out = reinterpret_cast<int64_t*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}else if (dataType == paddle::PaddleDType::INT32) {
databuf_size = out_num*sizeof(int32_t);
} else if (dataType == paddle::PaddleDType::INT32) {
databuf_size = out_num * sizeof(int32_t);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
int32_t* data_out = reinterpret_cast<int32_t*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}
//Because task scheduling requires OPs to use 'Channel'(which is a data structure) to transfer data between OPs.
//We need to copy the processed data to the 'Channel' for the next OP.
//In this function, it means we should copy the 'databuf_char' to the pointer 'void* out'.(which is also called ‘tensorVector_out_pointer’)
// Because task scheduling requires OPs to use 'Channel'
// (which is a data structure) to transfer data between OPs.
// We need to copy the processed data to the 'Channel' for the next OP.
// In this function, it means we should copy the 'databuf_char' to
// 'void* out'.(which is also called ‘tensorVector_out_pointer’)
paddle::PaddleTensor tensor_out;
tensor_out.name = outnames[i];
tensor_out.dtype = paddle::PaddleDType(dataType);
tensor_out.shape.assign(output_shape.begin(), output_shape.end());
std::vector<std::vector<size_t>> out_lod = lod_tensor_out->lod();
for (int li=0; li < out_lod.size(); ++li) {
for (int li = 0; li < out_lod.size(); ++li) {
std::vector<size_t> lod_element;
lod_element.assign(out_lod[li].begin(), out_lod[li].end());
tensor_out.lod.push_back(lod_element);
......@@ -611,8 +508,6 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<PaddleInferenceCore
int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT
return infer_impl(&in, &out);
}
};
typedef FactoryPool<InferEngine> StaticInferFactory;
......@@ -622,185 +517,49 @@ class VersionedInferEngine : public InferEngine {
VersionedInferEngine() { _versions.clear(); }
~VersionedInferEngine() {}
int proc_initialize(const configure::EngineDesc& conf) {
if (proc_initialize(conf, false) != 0) {
LOG(ERROR) << "Failed proc intialize engine: " << conf.name().c_str();
return -1;
}
int proc_initialize(const configure::EngineDesc& conf);
LOG(WARNING) << "Succ proc initialize engine: " << conf.name().c_str();
return 0;
}
int proc_initialize(const configure::EngineDesc& conf, bool version);
int proc_initialize(const configure::EngineDesc& conf, bool version) {
std::string engine_type = conf.type();
InferEngine* engine =
StaticInferFactory::instance().generate_object(engine_type);
if (!engine) {
LOG(ERROR) << "Failed generate engine with type:" << engine_type;
return -1;
}
#ifndef BCLOUD
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
int tmp = FLAGS_logtostderr;
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr;
FLAGS_logtostderr = tmp;
#else
if (engine->proc_initialize(conf, version) != 0) {
LOG(ERROR) << "Failed initialize engine, type:" << engine_type;
return -1;
}
#endif
auto r = _versions.insert(std::make_pair(engine->version(), engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine->version()
<< ", type: " << engine_type;
return -1;
}
LOG(WARNING) << "Succ proc initialize version engine: "
<< engine->version();
return 0;
}
int proc_finalize();
int proc_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize version engine: " << iter->first;
}
LOG(WARNING) << "Succ proc finalize version engine: " << iter->first;
}
return 0;
}
int thrd_initialize();
int thrd_initialize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize version engine: " << iter->first;
}
return 0;
}
int thrd_clear();
int thrd_clear() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear version engine: " << iter->first;
return -1;
}
}
return 0;
}
int thrd_finalize();
int thrd_finalize() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize version engine: " << iter->first;
}
return 0;
}
int reload() {
for (auto iter = _versions.begin(); iter != _versions.end(); ++iter) {
if (iter->second->reload() != 0) {
LOG(ERROR) << "Failed reload version engine: " << iter->first;
return -1;
}
LOG(WARNING) << "Succ reload version engine: " << iter->first;
}
return 0;
}
int reload();
uint64_t version() const {
InferEngine* engine = default_engine();
if (engine) {
return engine->version();
} else {
return uint64_t(-1);
}
}
uint64_t version() const;
// inference interface
InferEngine* default_engine() const {
if (_versions.size() != 1) {
LOG(ERROR) << "Ambiguous default engine version:" << _versions.size();
return NULL;
}
return _versions.begin()->second;
}
InferEngine* default_engine() const;
int infer(const void* in, void* out, uint32_t batch_size) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return -1;
}
return engine->infer(in, out, batch_size);
}
int infer(const void* in, void* out, uint32_t batch_size);
template <typename T>
T* get_core() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get core";
return NULL;
}
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(engine);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core";
return NULL;
}
T* get_core();
// versioned inference interface
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return -1;
}
return iter->second->infer(in, out, batch_size);
}
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version);
template <typename T>
T* get_core(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return NULL;
}
T* get_core(uint64_t version);
auto db_engine = dynamic_cast<DBReloadableInferEngine<T>*>(iter->second);
if (db_engine) {
return db_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << version;
return NULL;
}
int proc_initialize_impl(const configure::EngineDesc& conf, bool);
// --
int proc_initialize_impl(const configure::EngineDesc& conf, bool) {
return -1;
}
int thrd_initialize_impl() { return -1; }
int thrd_finalize_impl() { return -1; }
int thrd_clear_impl() { return -1; }
int proc_finalize_impl() { return -1; }
int infer_impl(const void* in, void* out, uint32_t batch_size = -1) { return -1; }
int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT
return -1;
} // NOLINT
int thrd_initialize_impl();
int thrd_finalize_impl();
int thrd_clear_impl();
int proc_finalize_impl();
int infer_impl(const void* in, void* out, uint32_t batch_size = -1);
int task_infer_impl(const BatchTensor& in, BatchTensor& out);
private:
boost::unordered_map<uint64_t, InferEngine*> _versions;
......@@ -813,158 +572,38 @@ class InferManager {
return ins;
}
int proc_initialize(const char* path, const char* file) {
ModelToolkitConf model_toolkit_conf;
if (configure::read_proto_conf(path, file, &model_toolkit_conf) != 0) {
LOG(ERROR) << "failed load infer config, path: " << path << "/" << file;
return -1;
}
size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) {
LOG(ERROR) << "Failed generate versioned engine: " << engine_name;
return -1;
}
if (engine->proc_initialize(model_toolkit_conf.engines(ei)) != 0) {
LOG(ERROR) << "Failed initialize version engine, name:" << engine_name;
return -1;
}
auto r = _map.insert(std::make_pair(engine_name, engine));
if (!r.second) {
LOG(ERROR) << "Failed insert item: " << engine_name;
return -1;
}
LOG(WARNING) << "Succ proc initialize engine: " << engine_name;
}
return 0;
}
int proc_initialize(const char* path, const char* file);
int thrd_initialize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_initialize() != 0) {
LOG(ERROR) << "Failed thrd initialize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd initialize engine, name: " << it->first;
}
return 0;
}
int thrd_initialize();
int thrd_clear() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_clear() != 0) {
LOG(ERROR) << "Failed thrd clear engine, name: " << it->first;
return -1;
}
}
return 0;
}
int thrd_clear();
int reload() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->reload() != 0) {
LOG(ERROR) << "Failed reload engine, name: " << it->first;
return -1;
}
}
return 0;
}
int reload();
int thrd_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->thrd_finalize() != 0) {
LOG(ERROR) << "Failed thrd finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ thrd finalize engine, name: " << it->first;
}
return 0;
}
int thrd_finalize();
int proc_finalize() {
for (auto it = _map.begin(); it != _map.end(); ++it) {
if (it->second->proc_finalize() != 0) {
LOG(ERROR) << "Failed proc finalize engine, name: " << it->first;
return -1;
}
LOG(WARNING) << "Succ proc finalize engine, name: " << it->first;
}
_map.clear();
return 0;
}
int proc_finalize();
// Inference interface
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size = -1) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size);
}
uint32_t batch_size = -1);
template <typename T>
T* get_core(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
auto infer_engine =
dynamic_cast<DBReloadableInferEngine<T>*>(it->second->default_engine());
if (infer_engine) {
return infer_engine->get_core();
}
LOG(WARNING) << "fail to get core for " << model_name;
return NULL;
}
T* get_core(const char* model_name);
// Versioned inference interface
int infer(const char* model_name,
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(in, out, batch_size, version);
}
uint64_t version);
template <typename T>
T* get_core(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return NULL;
}
return it->second->get_core<T>(version);
}
T* get_core(const char* model_name, uint64_t version);
int query_version(const std::string& model, uint64_t& version) { // NOLINT
auto it = _map.find(model);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model;
return -1;
}
auto infer_engine = it->second->default_engine();
if (!infer_engine) {
LOG(WARNING) << "Cannot get default engine for model:" << model;
return -1;
}
version = infer_engine->version();
LOG(INFO) << "Succ get version: " << version << " for model: " << model;
return 0;
}
int query_version(const std::string& model, uint64_t& version);
private:
boost::unordered_map<std::string, VersionedInferEngine*> _map;
......
......@@ -27,22 +27,9 @@ namespace predictor {
using configure::ResourceConf;
using configure::GeneralModelConfig;
using rec::mcube::CubeAPI;
// __thread bool p_thread_initialized = false;
static void dynamic_resource_deleter(void* d) {
#if 1
LOG(INFO) << "dynamic_resource_delete on " << bthread_self();
#endif
delete static_cast<DynamicResource*>(d);
}
DynamicResource::DynamicResource() {}
DynamicResource::~DynamicResource() {}
int DynamicResource::initialize() { return 0; }
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > Resource::get_general_model_config() {
std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
Resource::get_general_model_config() {
return _configs;
}
......@@ -96,8 +83,6 @@ void Resource::print_general_model_config(
}
}
int DynamicResource::clear() { return 0; }
int Resource::initialize(const std::string& path, const std::string& file) {
ResourceConf resource_conf;
if (configure::read_proto_conf(path, file, &resource_conf) != 0) {
......@@ -150,29 +135,25 @@ int Resource::initialize(const std::string& path, const std::string& file) {
if (FLAGS_enable_model_toolkit) {
size_t model_toolkit_num = resource_conf.model_toolkit_path_size();
for (size_t mi=0; mi < model_toolkit_num; ++mi) {
for (size_t mi = 0; mi < model_toolkit_num; ++mi) {
std::string model_toolkit_path = resource_conf.model_toolkit_path(mi);
std::string model_toolkit_file = resource_conf.model_toolkit_file(mi);
if (InferManager::instance().proc_initialize(
model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) {
LOG(ERROR) << "failed proc initialize modeltoolkit, config: "
<< model_toolkit_path << "/" << model_toolkit_file;
<< model_toolkit_path << "/" << model_toolkit_file;
return -1;
}
if (KVManager::instance().proc_initialize(
model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) {
LOG(ERROR) << "Failed proc initialize kvmanager, config: "
<< model_toolkit_path << "/" << model_toolkit_file;
<< model_toolkit_path << "/" << model_toolkit_file;
}
}
}
if (THREAD_KEY_CREATE(&_tls_bspec_key, dynamic_resource_deleter) != 0) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
return -1;
}
// init rocksDB or cube instance
if (resource_conf.has_cube_config_file() &&
resource_conf.has_cube_config_path()) {
......@@ -225,18 +206,16 @@ int Resource::general_model_initialize(const std::string& path,
return -1;
}
size_t general_model_num = resource_conf.general_model_path_size();
for (size_t gi=0; gi < general_model_num; ++gi) {
for (size_t gi = 0; gi < general_model_num; ++gi) {
std::string general_model_path = resource_conf.general_model_path(gi);
std::string general_model_file = resource_conf.general_model_file(gi);
GeneralModelConfig model_config;
if (configure::read_proto_conf(general_model_path.c_str(),
general_model_file.c_str(),
&model_config) != 0) {
LOG(ERROR) << "Failed initialize model config from: " << general_model_path
<< "/" << general_model_file;
general_model_file.c_str(),
&model_config) != 0) {
LOG(ERROR) << "Failed initialize model config from: "
<< general_model_path << "/" << general_model_file;
return -1;
}
auto _config = std::make_shared<PaddleGeneralModelConfig>();
......@@ -249,7 +228,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_is_lod_feed.resize(feed_var_num);
_config->_capacity.resize(feed_var_num);
_config->_feed_shape.resize(feed_var_num);
for (int i=0; i < feed_var_num; ++i) {
for (int i = 0; i < feed_var_num; ++i) {
_config->_feed_name[i] = model_config.feed_var(i).name();
_config->_feed_alias_name[i] = model_config.feed_var(i).alias_name();
VLOG(2) << "feed var[" << i << "]: " << _config->_feed_name[i];
......@@ -265,7 +244,7 @@ int Resource::general_model_initialize(const std::string& path,
VLOG(2) << "var[" << i << "] is tensor";
_config->_capacity[i] = 1;
_config->_is_lod_feed[i] = false;
for (int j=0; j < model_config.feed_var(i).shape_size(); ++j) {
for (int j = 0; j < model_config.feed_var(i).shape_size(); ++j) {
int32_t dim = model_config.feed_var(i).shape(j);
VLOG(2) << "var[" << i << "].shape[" << i << "]: " << dim;
_config->_feed_shape[i].push_back(dim);
......@@ -279,7 +258,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_fetch_name.resize(fetch_var_num);
_config->_fetch_alias_name.resize(fetch_var_num);
_config->_fetch_shape.resize(fetch_var_num);
for (int i=0; i < fetch_var_num; ++i) {
for (int i = 0; i < fetch_var_num; ++i) {
_config->_fetch_name[i] = model_config.fetch_var(i).name();
_config->_fetch_alias_name[i] = model_config.fetch_var(i).alias_name();
_config->_fetch_name_to_index[_config->_fetch_name[i]] = i;
......@@ -290,7 +269,7 @@ int Resource::general_model_initialize(const std::string& path,
_config->_is_lod_fetch[i] = true;
} else {
_config->_is_lod_fetch[i] = false;
for (int j=0; j < model_config.fetch_var(i).shape_size(); ++j) {
for (int j = 0; j < model_config.fetch_var(i).shape_size(); ++j) {
int dim = model_config.fetch_var(i).shape(j);
_config->_fetch_shape[i].push_back(dim);
}
......@@ -316,36 +295,6 @@ int Resource::thread_initialize() {
return -1;
}
DynamicResource* p_dynamic_resource =
reinterpret_cast<DynamicResource*>(THREAD_GETSPECIFIC(_tls_bspec_key));
if (p_dynamic_resource == NULL) {
p_dynamic_resource = new (std::nothrow) DynamicResource;
if (p_dynamic_resource == NULL) {
LOG(ERROR) << "failed to create tls DynamicResource";
return -1;
}
if (p_dynamic_resource->initialize() != 0) {
LOG(ERROR) << "DynamicResource initialize failed.";
delete p_dynamic_resource;
p_dynamic_resource = NULL;
return -1;
}
if (THREAD_SETSPECIFIC(_tls_bspec_key, p_dynamic_resource) != 0) {
LOG(ERROR) << "unable to set tls DynamicResource";
delete p_dynamic_resource;
p_dynamic_resource = NULL;
return -1;
}
}
#if 0
LOG(INFO) << "Successfully thread initialized dynamic resource";
#else
LOG(INFO) << bthread_self()
<< ": Successfully thread initialized dynamic resource "
<< p_dynamic_resource;
#endif
return 0;
}
......@@ -362,26 +311,6 @@ int Resource::thread_clear() {
LOG(ERROR) << "Failed thrd clear infer manager";
return -1;
}
DynamicResource* p_dynamic_resource =
reinterpret_cast<DynamicResource*>(THREAD_GETSPECIFIC(_tls_bspec_key));
if (p_dynamic_resource == NULL) {
#if 0
LOG(ERROR) << "tls dynamic resource shouldn't be null after "
<< "thread_initialize";
#else
LOG(ERROR)
<< bthread_self()
<< ": tls dynamic resource shouldn't be null after thread_initialize";
#endif
return -1;
}
if (p_dynamic_resource->clear() != 0) {
LOG(ERROR) << "Failed to invoke dynamic resource clear";
return -1;
}
// ...
return 0;
}
size_t Resource::get_cube_quant_bits() { return this->cube_quant_bits; }
......
......@@ -54,15 +54,6 @@ class PaddleGeneralModelConfig {
};
class BaseRdDict;
struct DynamicResource {
DynamicResource();
~DynamicResource();
int initialize();
int clear();
};
class Resource {
public:
......@@ -94,20 +85,17 @@ class Resource {
int finalize();
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > get_general_model_config();
std::vector<std::shared_ptr<PaddleGeneralModelConfig>>
get_general_model_config();
void print_general_model_config(
const std::shared_ptr<PaddleGeneralModelConfig>& config);
DynamicResource* get_dynamic_resource() {
return reinterpret_cast<DynamicResource*>(
THREAD_GETSPECIFIC(_tls_bspec_key));
}
size_t get_cube_quant_bits();
private:
int thread_finalize() { return 0; }
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > _configs;
std::vector<std::shared_ptr<PaddleGeneralModelConfig>> _configs;
std::string cube_config_fullpath;
int cube_quant_bits; // 0 if no empty
......
......@@ -57,9 +57,9 @@ PrecisionType GetPrecision(const std::string& precision_data) {
}
// Engine Base
class PaddleEngineBase {
class EngineCore {
public:
virtual ~PaddleEngineBase() {}
virtual ~EngineCore() {}
virtual std::vector<std::string> GetInputNames() {
return _predictor->GetInputNames();
}
......@@ -107,7 +107,7 @@ class PaddleEngineBase {
};
// Paddle Inference Engine
class PaddleInferenceEngine : public PaddleEngineBase {
class PaddleInferenceEngine : public EngineCore {
public:
int create(const configure::EngineDesc& engine_conf) {
std::string model_path = engine_conf.model_dir();
......
......@@ -25,19 +25,24 @@ args = benchmark_args()
def single_func(idx, resource):
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=1)
total_number = sum(1 for _ in train_reader())
if args.request == "rpc":
client = Client()
client.load_client_config(args.model)
client.connect([args.endpoint])
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=1)
start = time.time()
for data in train_reader():
#new_data = np.zeros((1, 13)).astype("float32")
#new_data[0] = data[0][0]
#fetch_map = client.predict(feed={"x": new_data}, fetch=["price"], batch=True)
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"])
end = time.time()
return [[end - start]]
return [[end - start], [total_number]]
elif args.request == "http":
train_reader = paddle.batch(
paddle.reader.shuffle(
......@@ -49,7 +54,7 @@ def single_func(idx, resource):
'http://{}/uci/prediction'.format(args.endpoint),
data={"x": data[0]})
end = time.time()
return [[end - start]]
return [[end - start], [total_number]]
multi_thread_runner = MultiThreadRunner()
......
......@@ -47,3 +47,5 @@ elif package_name.endswith('xpu'):
path = "paddle_serving_" + sys.argv[1]
commit_id = subprocess.check_output(['git', 'rev-parse', 'HEAD'])
update_info(path + "/version.py", "commit_id", commit_id)
update_info(path + "/version.py", "version_tag", "${VERSION_TAG}")
......@@ -12,3 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .models import ServingModels
from . import version
__version__ = version.version_tag
......@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" Paddle Serving App version string """
serving_app_version = "0.0.0"
version_tag = "0.0.0"
commit_id = ""
......@@ -17,4 +17,4 @@ from . import version
from . import client
from .client import *
__version__ = version.serving_client_version
__version__ = version.version_tag
......@@ -155,7 +155,7 @@ class Client(object):
file_path_list = []
for single_model_config in model_config_path_list:
if os.path.isdir(single_model_config):
file_path_list.append("{}/serving_server_conf.prototxt".format(
file_path_list.append("{}/serving_client_conf.prototxt".format(
single_model_config))
elif os.path.isfile(single_model_config):
file_path_list.append(single_model_config)
......@@ -574,7 +574,7 @@ class MultiLangClient(object):
file_path_list = []
for single_model_config in model_config_path_list:
if os.path.isdir(single_model_config):
file_path_list.append("{}/serving_server_conf.prototxt".format(
file_path_list.append("{}/serving_client_conf.prototxt".format(
single_model_config))
elif os.path.isfile(single_model_config):
file_path_list.append(single_model_config)
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" Paddle Serving Client version string """
serving_client_version = "0.0.0"
serving_server_version = "0.0.0"
version_tag = "0.0.0"
module_proto_version = "0.0.0"
commit_id = ""
......@@ -31,4 +31,4 @@ from paddle_serving_server import (
from .dag import *
from .server import *
__version__ = version.serving_server_version
__version__ = version.version_tag
......@@ -103,7 +103,7 @@ def serve_args():
def start_standard_model(serving_port): # pylint: disable=doc-string-missing
args = parse_args()
args = serve_args()
thread_num = args.thread
model = args.model
port = serving_port
......@@ -410,6 +410,7 @@ if __name__ == "__main__":
use_lite=args.use_lite,
use_xpu=args.use_xpu,
ir_optim=args.ir_optim,
thread_num=args.thread,
precision=args.precision,
use_calib=args.use_calib)
web_service.run_rpc_service()
......
......@@ -22,7 +22,7 @@ from .proto import general_model_config_pb2 as m_config
from .proto import multi_lang_general_model_service_pb2_grpc
import google.protobuf.text_format
import time
from .version import serving_server_version, version_suffix, device_type
from .version import version_tag, version_suffix, device_type
from contextlib import closing
import argparse
......@@ -369,7 +369,7 @@ class Server(object):
version_file = open("{}/version.py".format(self.module_path), "r")
folder_name = "serving-%s-%s" % (self.get_serving_bin_name(),
serving_server_version)
version_tag)
tar_name = "%s.tar.gz" % folder_name
bin_url = "https://paddle-serving.bj.bcebos.com/bin/%s" % tar_name
......
......@@ -12,10 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" Paddle Serving Server version string """
serving_client_version = "0.0.0"
serving_server_version = "0.0.0"
module_proto_version = "0.0.0"
version_tag = "0.0.0"
version_suffix = ""
module_proto_version = "0.0.0"
device_type = "0"
cuda_version = "9"
commit_id = ""
......@@ -176,10 +176,12 @@ class WebService(object):
use_xpu=False,
ir_optim=False,
gpuid=0,
thread_num=2,
mem_optim=True):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir
self.port = port
self.thread_num = thread_num
self.device = device
self.gpuid = gpuid
self.port_list = []
......@@ -197,7 +199,7 @@ class WebService(object):
self.workdir,
self.port_list[0],
-1,
thread_num=2,
thread_num=self.thread_num,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
......@@ -211,7 +213,7 @@ class WebService(object):
"{}_{}".format(self.workdir, i),
self.port_list[i],
gpuid,
thread_num=2,
thread_num=self.thread_num,
mem_optim=mem_optim,
use_lite=use_lite,
use_xpu=use_xpu,
......
......@@ -22,7 +22,7 @@ import os
from setuptools import setup, Distribution, Extension
from setuptools import find_packages
from setuptools import setup
from paddle_serving_app.version import serving_app_version
from paddle_serving_app.version import version_tag
from pkg_resources import DistributionNotFound, get_distribution
def python_version():
......@@ -78,7 +78,7 @@ package_dir={'paddle_serving_app':
setup(
name='paddle-serving-app',
version=serving_app_version.replace('-', ''),
version=version_tag.replace('-', ''),
description=
('Paddle Serving Package for saved model with PaddlePaddle'),
url='https://github.com/PaddlePaddle/Serving',
......
......@@ -22,7 +22,7 @@ import sys
from setuptools import setup, Distribution, Extension
from setuptools import find_packages
from setuptools import setup
from paddle_serving_client.version import serving_client_version
from paddle_serving_client.version import version_tag
import util
py_version = sys.version_info
......@@ -79,7 +79,7 @@ package_dir={'paddle_serving_client':
setup(
name='paddle-serving-client',
version=serving_client_version.replace('-', ''),
version=version_tag.replace('-', ''),
description=
('Paddle Serving Package for saved model with PaddlePaddle'),
url='https://github.com/PaddlePaddle/Serving',
......
......@@ -19,7 +19,7 @@ from __future__ import print_function
from setuptools import setup, Distribution, Extension
from setuptools import find_packages
from setuptools import setup
from paddle_serving.version import serving_client_version
from paddle_serving.version import version_tag
from grpc_tools import protoc
import util
......@@ -43,7 +43,7 @@ package_dir={'paddle_serving.serving_client':
setup(
name='paddle-serving-client',
version=serving_client_version.replace('-', ''),
version=version_tag.replace('-', ''),
description=
('Paddle Serving Package for saved model with PaddlePaddle'),
url='https://github.com/PaddlePaddle/Serving',
......
......@@ -19,10 +19,10 @@ from __future__ import print_function
from setuptools import setup, Distribution, Extension
from setuptools import find_packages
from setuptools import setup
from paddle_serving_server.version import serving_server_version, version_suffix
from paddle_serving_server.version import version_tag, version_suffix
import util
package_version = serving_server_version.replace('-', '')
package_version = version_tag.replace('-', '')
if version_suffix != "":
version_suffix = "post" + version_suffix
package_version = package_version + "." + version_suffix
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册