提交 dff86f23 编写于 作者: H HexToString

temp fix ocr C++ serving

上级 8f254ed9
......@@ -25,8 +25,16 @@ message(STATUS "CXX compiler: ${CMAKE_CXX_COMPILER}, version: "
"${CMAKE_CXX_COMPILER_ID} ${CMAKE_CXX_COMPILER_VERSION}")
message(STATUS "C compiler: ${CMAKE_C_COMPILER}, version: "
"${CMAKE_C_COMPILER_ID} ${CMAKE_C_COMPILER_VERSION}")
SET(OPENCV_DIR "" CACHE PATH "Location of libraries")
if(NOT DEFINED OPENCV_DIR)
message(FATAL_ERROR "please set OPENCV_DIR with -DOPENCV_DIR=/path/opencv")
endif()
if (WIN32)
find_package(OpenCV REQUIRED PATHS ${OPENCV_DIR}/build/ NO_DEFAULT_PATH)
else ()
find_package(OpenCV REQUIRED PATHS ${OPENCV_DIR}/share/OpenCV NO_DEFAULT_PATH)
endif ()
include_directories(${OpenCV_INCLUDE_DIRS})
find_package(Git REQUIRED)
find_package(Threads REQUIRED)
find_package(CUDA QUIET)
......
......@@ -26,7 +26,7 @@ ExternalProject_Add(
extern_zlib
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/madler/zlib.git"
GIT_TAG "v1.2.8"
GIT_TAG "v1.2.9"
PREFIX ${ZLIB_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
......@@ -54,7 +54,7 @@ ELSE(WIN32)
SET(ZLIB_LIBRARIES "${ZLIB_INSTALL_DIR}/lib/libz.a" CACHE FILEPATH "zlib library." FORCE)
ENDIF(WIN32)
ADD_LIBRARY(zlib STATIC IMPORTED GLOBAL)
#ADD_LIBRARY(zlib STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET zlib PROPERTY IMPORTED_LOCATION ${ZLIB_LIBRARIES})
ADD_DEPENDENCIES(zlib extern_zlib)
......
......@@ -54,10 +54,10 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; };
// reource conf
message ResourceConf {
required string model_toolkit_path = 1;
required string model_toolkit_file = 2;
optional string general_model_path = 3;
optional string general_model_file = 4;
repeated string model_toolkit_path = 1;
repeated string model_toolkit_file = 2;
repeated string general_model_path = 3;
repeated string general_model_file = 4;
optional string cube_config_path = 5;
optional string cube_config_file = 6;
optional int32 cube_quant_bits = 7; // set 0 if no quant.
......
......@@ -207,7 +207,7 @@ class PredictorClient {
void init_gflags(std::vector<std::string> argv);
int init(const std::string& client_conf);
int init(const std::vector<std::string> &client_conf);
void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file);
......@@ -227,6 +227,10 @@ class PredictorClient {
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::vector<int>>& int_lod_slot_batch,
const std::vector<std::vector<std::string>>& string_feed_batch,
const std::vector<std::string>& string_feed_name,
const std::vector<std::vector<int>>& string_shape,
const std::vector<std::vector<int>>& string_lod_slot_batch,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid,
......
......@@ -28,7 +28,7 @@ using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::general_model::FetchInst;
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32, P_STRING };
std::once_flag gflags_init_flag;
namespace py = pybind11;
......@@ -56,12 +56,12 @@ void PredictorClient::init_gflags(std::vector<std::string> argv) {
});
}
int PredictorClient::init(const std::string &conf_file) {
int PredictorClient::init(const std::vector<std::string> &conf_file) {
try {
GeneralModelConfig model_config;
if (configure::read_proto_conf(conf_file.c_str(), &model_config) != 0) {
if (configure::read_proto_conf(conf_file[0].c_str(), &model_config) != 0) {
LOG(ERROR) << "Failed to load general model config"
<< ", file path: " << conf_file;
<< ", file path: " << conf_file[0];
return -1;
}
......@@ -69,9 +69,7 @@ int PredictorClient::init(const std::string &conf_file) {
_fetch_name_to_idx.clear();
_shape.clear();
int feed_var_num = model_config.feed_var_size();
int fetch_var_num = model_config.fetch_var_size();
VLOG(2) << "feed var num: " << feed_var_num
<< "fetch_var_num: " << fetch_var_num;
VLOG(2) << "feed var num: " << feed_var_num;
for (int i = 0; i < feed_var_num; ++i) {
_feed_name_to_idx[model_config.feed_var(i).alias_name()] = i;
VLOG(2) << "feed alias name: " << model_config.feed_var(i).alias_name()
......@@ -90,6 +88,16 @@ int PredictorClient::init(const std::string &conf_file) {
_shape.push_back(tmp_feed_shape);
}
if (conf_file.size()>1) {
model_config.Clear();
if (configure::read_proto_conf(conf_file[conf_file.size()-1].c_str(), &model_config) != 0) {
LOG(ERROR) << "Failed to load general model config"
<< ", file path: " << conf_file[conf_file.size()-1];
return -1;
}
}
int fetch_var_num = model_config.fetch_var_size();
VLOG(2) << "fetch_var_num: " << fetch_var_num;
for (int i = 0; i < fetch_var_num; ++i) {
_fetch_name_to_idx[model_config.fetch_var(i).alias_name()] = i;
VLOG(2) << "fetch [" << i << "]"
......@@ -146,11 +154,16 @@ int PredictorClient::numpy_predict(
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &int_lod_slot_batch,
const std::vector<std::vector<std::string>>& string_feed_batch,
const std::vector<std::string>& string_feed_name,
const std::vector<std::vector<int>>& string_shape,
const std::vector<std::vector<int>>& string_lod_slot_batch,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
int batch_size = std::max( float_feed_batch.size(), int_feed_batch.size() );
batch_size = batch_size>string_feed_batch.size()? batch_size : string_feed_batch.size();
VLOG(2) << "batch size: " << batch_size;
predict_res_batch.clear();
Timer timeline;
......@@ -165,6 +178,7 @@ int PredictorClient::numpy_predict(
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "string feed name size: " << string_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
......@@ -178,6 +192,7 @@ int PredictorClient::numpy_predict(
FeedInst *inst = req.add_insts();
std::vector<py::array_t<float>> float_feed = float_feed_batch[bi];
std::vector<py::array_t<int64_t>> int_feed = int_feed_batch[bi];
std::vector<std::string> string_feed = string_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
......@@ -186,12 +201,13 @@ int PredictorClient::numpy_predict(
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
for (auto &name : string_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] " << "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
......@@ -203,7 +219,7 @@ int PredictorClient::numpy_predict(
for (uint32_t j = 0; j < float_lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(float_lod_slot_batch[vec_idx][j]);
}
tensor->set_elem_type(1);
tensor->set_elem_type(P_FLOAT32);
const int float_shape_size = float_shape[vec_idx].size();
switch (float_shape_size) {
case 4: {
......@@ -266,7 +282,7 @@ int PredictorClient::numpy_predict(
}
tensor->set_elem_type(_type[idx]);
if (_type[idx] == 0) {
if (_type[idx] == P_INT64) {
VLOG(2) << "prepare int feed " << name << " shape size "
<< int_shape[vec_idx].size();
} else {
......@@ -282,7 +298,7 @@ int PredictorClient::numpy_predict(
for (ssize_t j = 0; j < int_array.shape(1); j++) {
for (ssize_t k = 0; k < int_array.shape(2); k++) {
for (ssize_t l = 0; k < int_array.shape(3); l++) {
if (_type[idx] == 0) {
if (_type[idx] == P_INT64) {
tensor->add_int64_data(int_array(i, j, k, l));
} else {
tensor->add_int_data(int_array(i, j, k, l));
......@@ -298,7 +314,7 @@ int PredictorClient::numpy_predict(
for (ssize_t i = 0; i < int_array.shape(0); i++) {
for (ssize_t j = 0; j < int_array.shape(1); j++) {
for (ssize_t k = 0; k < int_array.shape(2); k++) {
if (_type[idx] == 0) {
if (_type[idx] == P_INT64) {
tensor->add_int64_data(int_array(i, j, k));
} else {
tensor->add_int_data(int_array(i, j, k));
......@@ -312,7 +328,7 @@ int PredictorClient::numpy_predict(
auto int_array = int_feed[vec_idx].unchecked<2>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
for (ssize_t j = 0; j < int_array.shape(1); j++) {
if (_type[idx] == 0) {
if (_type[idx] == P_INT64) {
tensor->add_int64_data(int_array(i, j));
} else {
tensor->add_int_data(int_array(i, j));
......@@ -324,7 +340,7 @@ int PredictorClient::numpy_predict(
case 1: {
auto int_array = int_feed[vec_idx].unchecked<1>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
if (_type[idx] == 0) {
if (_type[idx] == P_INT64) {
tensor->add_int64_data(int_array(i));
} else {
tensor->add_int_data(int_array(i));
......@@ -338,6 +354,38 @@ int PredictorClient::numpy_predict(
VLOG(2) << "batch [" << bi << "] "
<< "int feed value prepared";
vec_idx = 0;
for (auto &name : string_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
for (uint32_t j = 0; j < string_shape[vec_idx].size(); ++j) {
tensor->add_shape(string_shape[vec_idx][j]);
}
for (uint32_t j = 0; j < string_lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(string_lod_slot_batch[vec_idx][j]);
}
tensor->set_elem_type(P_STRING);
const int string_shape_size = string_shape[vec_idx].size();
//string_shape[vec_idx] = [1];cause numpy has no datatype of string.
//we pass string via vector<vector<string> >.
if( string_shape_size!= 1 ){
LOG(ERROR) << "string_shape_size should be 1-D, but received is : " << string_shape_size;
return -1;
}
switch (string_shape_size) {
case 1: {
tensor->add_data(string_feed[vec_idx]);
break;
}
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "string feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
......@@ -397,19 +445,19 @@ int PredictorClient::numpy_predict(
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
if (_fetch_name_to_type[name] == 0) {
if (_fetch_name_to_type[name] == P_INT64) {
VLOG(2) << "ferch var " << name << "type int64";
int size = output.insts(0).tensor_array(idx).int64_data_size();
model._int64_value_map[name] = std::vector<int64_t>(
output.insts(0).tensor_array(idx).int64_data().begin(),
output.insts(0).tensor_array(idx).int64_data().begin() + size);
} else if (_fetch_name_to_type[name] == 1) {
} else if (_fetch_name_to_type[name] == P_FLOAT32) {
VLOG(2) << "fetch var " << name << "type float";
int size = output.insts(0).tensor_array(idx).float_data_size();
model._float_value_map[name] = std::vector<float>(
output.insts(0).tensor_array(idx).float_data().begin(),
output.insts(0).tensor_array(idx).float_data().begin() + size);
} else if (_fetch_name_to_type[name] == 2) {
} else if (_fetch_name_to_type[name] == P_INT32) {
VLOG(2) << "fetch var " << name << "type int32";
int size = output.insts(0).tensor_array(idx).int_data_size();
model._int32_value_map[name] = std::vector<int32_t>(
......
......@@ -78,7 +78,7 @@ PYBIND11_MODULE(serving_client, m) {
self.init_gflags(argv);
})
.def("init",
[](PredictorClient &self, const std::string &conf) {
[](PredictorClient &self, const std::vector<std::string> &conf) {
return self.init(conf);
})
.def("set_predictor_conf",
......@@ -107,6 +107,10 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &int_lod_slot_batch,
const std::vector<std::vector<std::string>>& string_feed_batch,
const std::vector<std::string>& string_feed_name,
const std::vector<std::vector<int>>& string_shape,
const std::vector<std::vector<int>>& string_lod_slot_batch,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
......@@ -119,6 +123,10 @@ PYBIND11_MODULE(serving_client, m) {
int_feed_name,
int_shape,
int_lod_slot_batch,
string_feed_batch,
string_feed_name,
string_shape,
string_lod_slot_batch,
fetch_name,
predict_res_batch,
pid,
......
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../../)
include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(serving ${serving_srcs})
add_dependencies(serving pdcodegen fluid_cpu_engine pdserving paddle_fluid cube-api utils)
if (WITH_GPU)
......@@ -27,7 +28,7 @@ endif()
target_link_libraries(serving -Wl,--whole-archive fluid_cpu_engine
-Wl,--no-whole-archive)
target_link_libraries(serving ${OpenCV_LIBS})
target_link_libraries(serving paddle_fluid ${paddle_depend_libs})
target_link_libraries(serving brpc)
target_link_libraries(serving protobuf)
......@@ -35,6 +36,7 @@ target_link_libraries(serving pdserving)
target_link_libraries(serving cube-api)
target_link_libraries(serving utils)
if(WITH_GPU)
target_link_libraries(serving ${CUDA_LIBRARIES})
endif()
......
FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp ${CMAKE_CURRENT_LIST_DIR}/../../predictor/tools/quant.cpp)
FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp ${CMAKE_CURRENT_LIST_DIR}/../../predictor/tools/quant.cpp ${CMAKE_CURRENT_LIST_DIR}/../../predictor/tools/ocrtools/*.cpp)
LIST(APPEND serving_srcs ${op_srcs})
......@@ -117,8 +117,9 @@ int GeneralDistKVQuantInferOp::inference() {
std::unordered_map<int, int> in_out_map;
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
//TODO:Temporary addition, specific details to be studied by HexToString
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
resource.get_general_model_config()[0];
int cube_quant_bits = resource.get_cube_quant_bits();
size_t EMBEDDING_SIZE = 0;
if (cube_quant_bits == 0) {
......
......@@ -37,6 +37,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference";
std::cout<<"I am GeneralInferOp"<<std::endl;
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
......
......@@ -32,7 +32,7 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32 };
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32, P_STRING };
int conf_check(const Request *req,
const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
int var_num = req->insts(0).tensor_array_size();
......@@ -76,7 +76,7 @@ int GeneralReaderOp::inference() {
int input_var_num = 0;
std::vector<int64_t> elem_type;
std::vector<int64_t> elem_size;
std::vector<int64_t> capacity;
std::vector<int64_t> databuf_size;
GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &(res->tensor_vector);
......@@ -98,8 +98,9 @@ int GeneralReaderOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
//get the first InferOP's model_config as ReaderOp's model_config by default.
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
resource.get_general_model_config()[0];
// TODO(guru4elephant): how to do conditional check?
/*
......@@ -114,135 +115,114 @@ int GeneralReaderOp::inference() {
elem_type.resize(var_num);
elem_size.resize(var_num);
capacity.resize(var_num);
databuf_size.resize(var_num);
// prepare basic information for input
// specify the memory needed for output tensor_vector
// fill the data into output general_blob
int data_len = 0;
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor;
const Tensor &tensor = req->insts(0).tensor_array(i);
data_len = 0;
elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
if (elem_type[i] == P_INT64) { // int64
elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64;
data_len = tensor.int64_data_size();
} else if (elem_type[i] == P_FLOAT32) {
elem_size[i] = sizeof(float);
lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
data_len = tensor.float_data_size();
} else if (elem_type[i] == P_INT32) {
elem_size[i] = sizeof(int32_t);
lod_tensor.dtype = paddle::PaddleDType::INT32;
data_len = tensor.int_data_size();
} else if (elem_type[i] == P_STRING) {
//use paddle::PaddleDType::UINT8 as for String.
elem_size[i] = sizeof(uint8_t);
lod_tensor.dtype = paddle::PaddleDType::UINT8;
//this is for vector<String>, cause the databuf_size != vector<String>.size()*sizeof(char);
for (int idx = 0; idx < tensor.data_size(); idx++) {
data_len += tensor.data()[idx].length();
}
}
// implement lod tensor here
// only support 1-D lod
// TODO:support 2-D lod
if (req->insts(0).tensor_array(i).lod_size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
lod_tensor.lod.resize(1);
for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) {
lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
}
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
} else {
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
}
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
}
// specify the memory needed for output tensor_vector
int tensor_size = 0;
int data_len = 0;
for (int i = 0; i < var_num; ++i) {
if (out->at(i).lod.size() == 1) {
tensor_size = 0;
const Tensor &tensor = req->insts(0).tensor_array(i);
data_len = 0;
if (tensor.int64_data_size() > 0) {
data_len = tensor.int64_data_size();
} else if (tensor.float_data_size() > 0) {
data_len = tensor.float_data_size();
} else if (tensor.int_data_size() > 0) {
data_len = tensor.int_data_size();
}
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
tensor_size += data_len;
int cur_len = out->at(i).lod[0].back();
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0;
if (tensor.shape_size() == 1) {
sample_len = data_len;
} else {
sample_len = tensor.shape(0);
}
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
out->at(i).data.Resize(tensor_size * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
databuf_size[i] = data_len * elem_size[i];
out->at(i).data.Resize(data_len * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else {
out->at(i).data.Resize(capacity[i] * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor and capacity=" << capacity[i];
}
}
// fill the data into output general_blob
int offset = 0;
int elem_num = 0;
for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == P_INT64) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int64_data(0);
offset = 0;
elem_num = req->insts(0).tensor_array(i).int64_data_size();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
}
int elem_num = req->insts(0).tensor_array(i).int64_data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(0).tensor_array(i).int64_data(k);
dst_ptr[k] = req->insts(0).tensor_array(i).int64_data(k);
}
} else if (elem_type[i] == P_FLOAT32) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).float_data(0);
offset = 0;
elem_num = req->insts(0).tensor_array(i).float_data_size();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
}
//memcpy(dst_ptr,req->insts(0).tensor_array(i).float_data(),databuf_size[i]);
int elem_num = req->insts(0).tensor_array(i).float_data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(0).tensor_array(i).float_data(k);
dst_ptr[k] = req->insts(0).tensor_array(i).float_data(k);
}
} else if (elem_type[i] == P_INT32) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int_data(0);
offset = 0;
elem_num = req->insts(0).tensor_array(i).int_data_size();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
}
int elem_num = req->insts(0).tensor_array(i).int_data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = req->insts(0).tensor_array(i).int_data(k);
}
} else if (elem_type[i] == P_STRING) {
std::string *dst_ptr = static_cast<std::string *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).data(0);
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
}
int elem_num = req->insts(0).tensor_array(i).data_size();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(0).tensor_array(i).int_data(k);
dst_ptr[k] = req->insts(0).tensor_array(i).data(k);
}
}
}
......
......@@ -63,8 +63,9 @@ int GeneralResponseOp::inference() {
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
//get the last InferOP's model_config as ResponseOp's model_config by default.
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
resource.get_general_model_config().back();
VLOG(2) << "(logid=" << log_id
<< ") max body size : " << brpc::fLU64::FLAGS_max_body_size;
......
......@@ -73,7 +73,7 @@ int GeneralTextReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
resource.get_general_model_config()[0];
VLOG(2) << "(logid=" << log_id << ") print general model config done.";
......
......@@ -58,7 +58,7 @@ int GeneralTextResponseOp::inference() {
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
resource.get_general_model_config().back();
std::vector<int> fetch_index;
fetch_index.resize(req->fetch_var_names_size());
......
......@@ -608,6 +608,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<FluidFamilyCore> {
for(int i =0; i< tensorVector_in_pointer->size();++i){
auto lod_tensor_in = core->GetInputHandle((*tensorVector_in_pointer)[i].name);
lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod);
std::cout<< "i am thomas young and i want to know the in info name : "<<(*tensorVector_in_pointer)[i].name
<<",shapesize:" <<(*tensorVector_in_pointer)[i].shape.size()<<"shape :";;
for (auto l = 0; l != (*tensorVector_in_pointer)[i].shape.size(); ++l) std::cout << (*tensorVector_in_pointer)[i].shape[l] << " ,";
std::cout<< std::endl;
lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape);
void* origin_data = (*tensorVector_in_pointer)[i].data.data();
//Because the core needs to determine the size of memory space according to the data type passed in.
......@@ -648,6 +652,10 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<FluidFamilyCore> {
for (int i = 0; i < outnames.size(); ++i){
auto lod_tensor_out = core->GetOutputHandle(outnames[i]);
output_shape = lod_tensor_out->shape();
std::cout<< "i am thomas young and i want to know the out info name : "<<outnames[i]
<<",shapesize:" <<output_shape.size()<<"shape :";
for (auto l = 0; l != output_shape.size(); ++l) std::cout << output_shape[l] << " ,";
std::cout<< std::endl;
out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
dataType = lod_tensor_out->type();
if (dataType == paddle::PaddleDType::FLOAT32) {
......@@ -659,6 +667,7 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<FluidFamilyCore> {
}
float* data_out = reinterpret_cast<float*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
std::cout<< "the out num: "<<out_num<<" value = "<< data_out[0] <<" ,"<<std::endl;
databuf_char = reinterpret_cast<char*>(data_out);
}else if (dataType == paddle::PaddleDType::INT64) {
databuf_size = out_num*sizeof(int64_t);
......
......@@ -42,8 +42,8 @@ DynamicResource::~DynamicResource() {}
int DynamicResource::initialize() { return 0; }
std::shared_ptr<PaddleGeneralModelConfig> Resource::get_general_model_config() {
return _config;
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > Resource::get_general_model_config() {
return _configs;
}
void Resource::print_general_model_config(
......@@ -149,19 +149,13 @@ int Resource::initialize(const std::string& path, const std::string& file) {
#endif
if (FLAGS_enable_model_toolkit) {
int err = 0;
std::string model_toolkit_path = resource_conf.model_toolkit_path();
if (err != 0) {
LOG(ERROR) << "read model_toolkit_path failed, path[" << path
<< "], file[" << file << "]";
return -1;
}
std::string model_toolkit_file = resource_conf.model_toolkit_file();
if (err != 0) {
LOG(ERROR) << "read model_toolkit_file failed, path[" << path
<< "], file[" << file << "]";
return -1;
}
size_t model_toolkit_num = resource_conf.model_toolkit_path_size();
for (size_t mi = 0; mi < model_toolkit_num; ++mi) {
std::string model_toolkit_path = resource_conf.model_toolkit_path(mi);
std::string model_toolkit_file = resource_conf.model_toolkit_file(mi);
if (InferManager::instance().proc_initialize(
model_toolkit_path.c_str(), model_toolkit_file.c_str()) != 0) {
LOG(ERROR) << "failed proc initialize modeltoolkit, config: "
......@@ -175,6 +169,7 @@ int Resource::initialize(const std::string& path, const std::string& file) {
<< model_toolkit_path << "/" << model_toolkit_file;
}
}
}
if (THREAD_KEY_CREATE(&_tls_bspec_key, dynamic_resource_deleter) != 0) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
......@@ -231,14 +226,12 @@ int Resource::general_model_initialize(const std::string& path,
LOG(ERROR) << "Failed initialize resource from: " << path << "/" << file;
return -1;
}
int err = 0;
std::string general_model_path = resource_conf.general_model_path();
std::string general_model_file = resource_conf.general_model_file();
if (err != 0) {
LOG(ERROR) << "read general_model_path failed, path[" << path << "], file["
<< file << "]";
return -1;
}
size_t general_model_num = resource_conf.general_model_path_size();
for (size_t gi = 0; gi < general_model_num; ++gi) {
std::string general_model_path = resource_conf.general_model_path(gi);
std::string general_model_file = resource_conf.general_model_file(gi);
GeneralModelConfig model_config;
if (configure::read_proto_conf(general_model_path.c_str(),
......@@ -248,8 +241,7 @@ int Resource::general_model_initialize(const std::string& path,
<< "/" << general_model_file;
return -1;
}
_config.reset(new PaddleGeneralModelConfig());
auto _config = std::make_shared<PaddleGeneralModelConfig>();
int feed_var_num = model_config.feed_var_size();
VLOG(2) << "load general model config";
VLOG(2) << "feed var num: " << feed_var_num;
......@@ -306,6 +298,8 @@ int Resource::general_model_initialize(const std::string& path,
}
}
}
_configs.push_back(std::move(_config));
}
return 0;
}
......
......@@ -94,7 +94,7 @@ class Resource {
int finalize();
std::shared_ptr<PaddleGeneralModelConfig> get_general_model_config();
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > get_general_model_config();
void print_general_model_config(
const std::shared_ptr<PaddleGeneralModelConfig>& config);
......@@ -107,7 +107,7 @@ class Resource {
private:
int thread_finalize() { return 0; }
std::shared_ptr<PaddleGeneralModelConfig> _config;
std::vector<std::shared_ptr<PaddleGeneralModelConfig> > _configs;
std::string cube_config_fullpath;
int cube_quant_bits; // 0 if no empty
......
......@@ -126,7 +126,7 @@ int main(int argc, char** argv) {
return 0;
}
google::ParseCommandLineFlags(&argc, &argv, true);
//google::ParseCommandLineFlags(&argc, &argv, true);
g_change_server_port();
......@@ -202,6 +202,7 @@ int main(int argc, char** argv) {
}
VLOG(2) << "Succ call pthread worker start function";
//this is not used by any code segment,which can be cancelled.
if (Resource::instance().general_model_initialize(FLAGS_resource_path,
FLAGS_resource_file) != 0) {
LOG(ERROR) << "Failed to initialize general model conf: "
......
......@@ -28,8 +28,13 @@ test_reader = paddle.batch(
batch_size=1)
for data in test_reader():
new_data = np.zeros((1, 1, 13)).astype("float32")
new_data = np.zeros((1, 13)).astype("float32")
print('testclient.py-----data',data[0][0])
print('testclient.py-----shape',data[0][0].shape)
new_data[0] = data[0][0]
print('testclient.py-----newdata',new_data)
print('testclient.py-----newdata-0',new_data[0])
print('testclient.py-----newdata.shape',new_data.shape)
fetch_map = client.predict(
feed={"x": new_data}, fetch=["price"], batch=True)
print("{} {}".format(fetch_map["price"][0], data[0][1][0]))
......
......@@ -34,8 +34,10 @@ from .proto import multi_lang_general_model_service_pb2_grpc
int64_type = 0
float32_type = 1
int32_type = 2
bytes_type = 3
int_type = set([int64_type, int32_type])
float_type = set([float32_type])
string_type= set([bytes_type])
class _NOPProfiler(object):
......@@ -140,9 +142,14 @@ class Client(object):
self.predictorres_constructor = PredictorRes
def load_client_config(self, path):
if isinstance(path, str):
path_list = [path]
elif isinstance(path, list):
path_list = path
from .serving_client import PredictorClient
model_conf = m_config.GeneralModelConfig()
f = open(path, 'r')
f = open(path_list[0], 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
......@@ -151,21 +158,18 @@ class Client(object):
# get feed shapes, feed types
# map feed names to index
self.client_handle_ = PredictorClient()
self.client_handle_.init(path)
self.client_handle_.init(path_list)
if "FLAGS_max_body_size" not in os.environ:
os.environ["FLAGS_max_body_size"] = str(512 * 1024 * 1024)
read_env_flags = ["profile_client", "profile_server", "max_body_size"]
self.client_handle_.init_gflags([sys.argv[
0]] + ["--tryfromenv=" + ",".join(read_env_flags)])
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.feed_names_to_idx_ = {}
self.fetch_names_to_type_ = {}
self.fetch_names_to_idx_ = {}
self.lod_tensor_set = set()
self.feed_tensor_len = {}
self.key = None
for i, var in enumerate(model_conf.feed_var):
self.feed_names_to_idx_[var.alias_name] = i
self.feed_types_[var.alias_name] = var.feed_type
......@@ -178,6 +182,15 @@ class Client(object):
for dim in self.feed_shapes_[var.alias_name]:
counter *= dim
self.feed_tensor_len[var.alias_name] = counter
if len(path_list) > 1:
model_conf = m_config.GeneralModelConfig()
f = open(path_list[-1], 'r')
model_conf = google.protobuf.text_format.Merge(
str(f.read()), model_conf)
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_names_to_type_ = {}
self.fetch_names_to_idx_ = {}
for i, var in enumerate(model_conf.fetch_var):
self.fetch_names_to_idx_[var.alias_name] = i
self.fetch_names_to_type_[var.alias_name] = var.fetch_type
......@@ -185,6 +198,7 @@ class Client(object):
self.lod_tensor_set.add(var.alias_name)
return
def add_variant(self, tag, cluster, variant_weight):
if self.predictor_sdk_ is None:
self.predictor_sdk_ = SDKConfig()
......@@ -288,13 +302,17 @@ class Client(object):
raise ValueError("Feed only accepts dict and list of dict")
int_slot_batch = []
float_slot_batch = []
int_feed_names = []
float_feed_names = []
int_shape = []
int_lod_slot_batch = []
float_slot_batch = []
float_feed_names = []
float_lod_slot_batch = []
float_shape = []
string_slot_batch = []
string_feed_names = []
string_lod_slot_batch = []
string_shape = []
fetch_names = []
counter = 0
......@@ -311,9 +329,11 @@ class Client(object):
for i, feed_i in enumerate(feed_batch):
int_slot = []
float_slot = []
int_lod_slot = []
float_slot = []
float_lod_slot = []
string_slot = []
string_lod_slot = []
for key in feed_i:
if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
......@@ -368,10 +388,25 @@ class Client(object):
else:
float_slot.append(feed_i[key])
self.all_numpy_input = False
#if input is string, feed is not numpy.
elif self.feed_types_[key] in string_type:
if i == 0:
string_feed_names.append(key)
string_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
string_lod_slot_batch.append(feed_i["{}.lod".format(
key)])
else:
string_lod_slot_batch.append([])
string_slot.append(feed_i[key])
self.has_numpy_input = True
int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot)
int_lod_slot_batch.append(int_lod_slot)
float_slot_batch.append(float_slot)
float_lod_slot_batch.append(float_lod_slot)
string_slot_batch.append(string_slot)
string_lod_slot_batch.append(string_lod_slot)
self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0')
......@@ -381,7 +416,8 @@ class Client(object):
res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape,
float_lod_slot_batch, int_slot_batch, int_feed_names, int_shape,
int_lod_slot_batch, fetch_names, result_batch_handle, self.pid,
int_lod_slot_batch, string_slot_batch, string_feed_names, string_shape,
string_lod_slot_batch, fetch_names, result_batch_handle, self.pid,
log_id)
elif self.has_numpy_input == False:
raise ValueError(
......
......@@ -48,7 +48,8 @@ class OpMaker(object):
"general_single_kv": "GeneralSingleKVOp",
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp"
"general_copy": "GeneralCopyOp",
"general_YSL":"GeneralYSLOp",
}
self.node_name_suffix_ = collections.defaultdict(int)
......@@ -136,16 +137,17 @@ class Server(object):
def __init__(self):
self.server_handle_ = None
self.infer_service_conf = None
self.model_toolkit_conf = None
self.model_toolkit_conf = []#The quantity is equal to the InferOp quantity,Engine--OP
self.resource_conf = None
self.memory_optimization = False
self.ir_optimization = False
self.model_conf = None
self.workflow_fn = "workflow.prototxt"
self.resource_fn = "resource.prototxt"
self.infer_service_fn = "infer_service.prototxt"
self.model_toolkit_fn = "model_toolkit.prototxt"
self.general_model_config_fn = "general_model.prototxt"
self.model_conf = collections.OrderedDict()# save the serving_server_conf.prototxt content (feed and fetch information) this is a map for multi-model in a workflow
self.workflow_fn = "workflow.prototxt"#only one for one Service,Workflow--Op
self.resource_fn = "resource.prototxt"#only one for one Service,model_toolkit_fn and general_model_config_fn is recorded in this file
self.infer_service_fn = "infer_service.prototxt"#only one for one Service,Service--Workflow
self.model_toolkit_fn = []#["general_infer_0/model_toolkit.prototxt"]The quantity is equal to the InferOp quantity,Engine--OP
self.general_model_config_fn = []#["general_infer_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
self.subdirectory = []#The quantity is equal to the InferOp quantity, and name = node.name = engine.name
self.cube_config_fn = "cube.conf"
self.workdir = ""
self.max_concurrency = 0
......@@ -160,10 +162,10 @@ class Server(object):
self.encryption_model = False
self.product_name = None
self.container_id = None
self.model_config_paths = None # for multi-model in a workflow
self.model_config_paths = collections.OrderedDict() # save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
def get_fetch_list(self,infer_node_idx = -1 ):
fetch_names = [var.alias_name for var in list(self.model_conf.values())[infer_node_idx].fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency):
......@@ -218,8 +220,7 @@ class Server(object):
def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
self.model_toolkit_conf = []
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
......@@ -249,8 +250,8 @@ class Server(object):
engine.type = "FLUID_GPU_ANALYSIS_ENCRYPT"
else:
engine.type = "FLUID_GPU_ANALYSIS" + suffix
self.model_toolkit_conf.engines.extend([engine])
self.model_toolkit_conf.append(server_sdk.ModelToolkitConf())
self.model_toolkit_conf[-1].engines.extend([engine])
def _prepare_infer_service(self, port):
if self.infer_service_conf == None:
......@@ -264,9 +265,10 @@ class Server(object):
def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir
if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn),
for idx, op_general_model_config_fn in enumerate(self.general_model_config_fn):
with open("{}/{}".format(workdir, op_general_model_config_fn),
"w") as fout:
fout.write(str(self.model_conf))
fout.write(str(list(self.model_conf.values())[idx]))
self.resource_conf = server_sdk.ResourceConf()
for workflow in self.workflow_conf.workflows:
for node in workflow.nodes:
......@@ -280,10 +282,11 @@ class Server(object):
shutil.copy(cube_conf, workdir)
if "quant" in node.name:
self.resource_conf.cube_quant_bits = 8
self.resource_conf.model_toolkit_path = workdir
self.resource_conf.model_toolkit_file = self.model_toolkit_fn
self.resource_conf.general_model_path = workdir
self.resource_conf.general_model_file = self.general_model_config_fn
self.resource_conf.model_toolkit_path.extend([workdir])
self.resource_conf.model_toolkit_file.extend([self.model_toolkit_fn[idx]])
self.resource_conf.general_model_path.extend([workdir])
self.resource_conf.general_model_file.extend([op_general_model_config_fn])
#TODO:figure out the meaning of product_name and container_id.
if self.product_name != None:
self.resource_conf.auth_product_name = self.product_name
if self.container_id != None:
......@@ -293,54 +296,52 @@ class Server(object):
with open(filepath, "w") as fout:
fout.write(str(pb_obj))
def load_model_config(self, model_config_paths):
def load_model_config(self, model_config_paths_args):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
if isinstance(model_config_paths_args, list):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
default_engine_types = [
'GeneralInferOp', 'GeneralDistKVInferOp',
'GeneralDistKVQuantInferOp','GeneralYSLOp',
]
engine_name = None
model_config_paths_list_idx = 0
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
if node.type in default_engine_types:
if node.name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
f = open("{}/serving_server_conf.prototxt".format(
model_config_paths_args[model_config_paths_list_idx]), 'r')
self.model_conf[node.name] = google.protobuf.text_format.Merge(str(f.read()), m_config.GeneralModelConfig())
self.model_config_paths[node.name] = model_config_paths_args[model_config_paths_list_idx]
self.general_model_config_fn.append(node.name+"/general_model.prototxt")
self.model_toolkit_fn.append(node.name+"/model_toolkit.prototxt")
self.subdirectory.append(node.name)
model_config_paths_list_idx += 1
if model_config_paths_list_idx == len(model_config_paths_args):
break
elif isinstance(model_config_paths_args, dict):
self.model_config_paths = collections.OrderedDict()
for node_str, path in model_config_paths_args.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = list(self.model_config_paths.items())[0][
1]
f = open("{}/serving_server_conf.prototxt".format(path), 'r')
self.model_conf[node.name] = google.protobuf.text_format.Merge(
str(f.read()), m_config.GeneralModelConfig())
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
# print config here
type(model_config_paths_args)))
def use_mkl(self, flag):
self.mkl_flag = flag
......@@ -416,7 +417,9 @@ class Server(object):
os.system("mkdir {}".format(workdir))
else:
os.system("mkdir {}".format(workdir))
os.system("touch {}/fluid_time_file".format(workdir))
for subdir in self.subdirectory:
os.system("mkdir {}/{}".format(workdir, subdir))
os.system("touch {}/{}/fluid_time_file".format(workdir, subdir))
if not self.port_is_available(port):
raise SystemExit("Port {} is already used".format(port))
......@@ -427,14 +430,18 @@ class Server(object):
self.workdir = workdir
infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
resource_fn = "{}/{}".format(workdir, self.resource_fn)
model_toolkit_fn = "{}/{}".format(workdir, self.model_toolkit_fn)
self._write_pb_str(infer_service_fn, self.infer_service_conf)
workflow_fn = "{}/{}".format(workdir, self.workflow_fn)
self._write_pb_str(workflow_fn, self.workflow_conf)
resource_fn = "{}/{}".format(workdir, self.resource_fn)
self._write_pb_str(resource_fn, self.resource_conf)
self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf)
for idx,single_model_toolkit_fn in enumerate(self.model_toolkit_fn):
model_toolkit_fn = "{}/{}".format(workdir, single_model_toolkit_fn)
self._write_pb_str(model_toolkit_fn, self.model_toolkit_conf[idx])
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
......@@ -483,267 +490,3 @@ class Server(object):
print("Going to Run Command")
print(command)
os.system(command)
class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
MultiLangGeneralModelServiceServicer):
def __init__(self, model_config_path, is_multi_model, endpoints):
self.is_multi_model_ = is_multi_model
self.model_config_path_ = model_config_path
self.endpoints_ = endpoints
with open(self.model_config_path_) as f:
self.model_config_str_ = str(f.read())
self._parse_model_config(self.model_config_str_)
self._init_bclient(self.model_config_path_, self.endpoints_)
def _init_bclient(self, model_config_path, endpoints, timeout_ms=None):
from paddle_serving_client import Client
self.bclient_ = Client()
if timeout_ms is not None:
self.bclient_.set_rpc_timeout_ms(timeout_ms)
self.bclient_.load_client_config(model_config_path)
self.bclient_.connect(endpoints)
def _parse_model_config(self, model_config_str):
model_conf = m_config.GeneralModelConfig()
model_conf = google.protobuf.text_format.Merge(model_config_str,
model_conf)
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.feed_types_ = {}
self.feed_shapes_ = {}
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.fetch_types_ = {}
self.lod_tensor_set_ = set()
for i, var in enumerate(model_conf.feed_var):
self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
for i, var in enumerate(model_conf.fetch_var):
self.fetch_types_[var.alias_name] = var.fetch_type
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _flatten_list(self, nested_list):
for item in nested_list:
if isinstance(item, (list, tuple)):
for sub_item in self._flatten_list(item):
yield sub_item
else:
yield item
def _unpack_inference_request(self, request):
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
for idx, name in enumerate(feed_names):
var = feed_inst.tensor_array[idx]
v_type = self.feed_types_[name]
data = None
if is_python:
if v_type == 0: # int64
data = np.frombuffer(var.data, dtype="int64")
elif v_type == 1: # float32
data = np.frombuffer(var.data, dtype="float32")
elif v_type == 2: # int32
data = np.frombuffer(var.data, dtype="int32")
else:
raise Exception("error type.")
else:
if v_type == 0: # int64
data = np.array(list(var.int64_data), dtype="int64")
elif v_type == 1: # float32
data = np.array(list(var.float_data), dtype="float32")
elif v_type == 2: # int32
data = np.array(list(var.int_data), dtype="int32")
else:
raise Exception("error type.")
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
if len(var.lod) > 0:
feed_dict["{}.lod".format(name)] = var.lod
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
if ret is None:
resp.err_code = 1
return resp
results, tag = ret
resp.tag = tag
resp.err_code = 0
if not self.is_multi_model_:
results = {'general_infer_0': results}
for model_name, model_result in results.items():
model_output = multi_lang_general_model_service_pb2.ModelOutput()
inst = multi_lang_general_model_service_pb2.FetchInst()
for idx, name in enumerate(fetch_names):
tensor = multi_lang_general_model_service_pb2.Tensor()
v_type = self.fetch_types_[name]
if is_python:
tensor.data = model_result[name].tobytes()
else:
if v_type == 0: # int64
tensor.int64_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 1: # float32
tensor.float_data.extend(model_result[name].reshape(-1)
.tolist())
elif v_type == 2: # int32
tensor.int_data.extend(model_result[name].reshape(-1)
.tolist())
else:
raise Exception("error type.")
tensor.shape.extend(list(model_result[name].shape))
if "{}.lod".format(name) in model_result:
tensor.lod.extend(model_result["{}.lod".format(name)]
.tolist())
inst.tensor_array.append(tensor)
model_output.insts.append(inst)
model_output.engine_name = model_name
resp.outputs.append(model_output)
return resp
def SetTimeout(self, request, context):
# This porcess and Inference process cannot be operate at the same time.
# For performance reasons, do not add thread lock temporarily.
timeout_ms = request.timeout_ms
self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms)
resp = multi_lang_general_model_service_pb2.SimpleResponse()
resp.err_code = 0
return resp
def Inference(self, request, context):
feed_batch, fetch_names, is_python, log_id = \
self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_batch,
fetch=fetch_names,
batch=True,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
resp = multi_lang_general_model_service_pb2.GetClientConfigResponse()
resp.client_config_str = self.model_config_str_
return resp
class MultiLangServer(object):
def __init__(self):
self.bserver_ = Server()
self.worker_num_ = 4
self.body_size_ = 64 * 1024 * 1024
self.concurrency_ = 100000
self.is_multi_model_ = False # for model ensemble
def set_max_concurrency(self, concurrency):
self.concurrency_ = concurrency
self.bserver_.set_max_concurrency(concurrency)
def set_num_threads(self, threads):
self.worker_num_ = threads
self.bserver_.set_num_threads(threads)
def set_max_body_size(self, body_size):
self.bserver_.set_max_body_size(body_size)
if body_size >= self.body_size_:
self.body_size_ = body_size
else:
print(
"max_body_size is less than default value, will use default value in service."
)
def use_encryption_model(self, flag=False):
self.encryption_model = flag
def set_port(self, port):
self.gport_ = port
def set_reload_interval(self, interval):
self.bserver_.set_reload_interval(interval)
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def set_op_graph(self, op_graph):
self.bserver_.set_op_graph(op_graph)
def set_memory_optimize(self, flag=False):
self.bserver_.set_memory_optimize(flag)
def set_ir_optimize(self, flag=False):
self.bserver_.set_ir_optimize(flag)
def set_op_sequence(self, op_seq):
self.bserver_.set_op_sequence(op_seq)
def use_mkl(self, flag):
self.bserver_.use_mkl(flag)
def load_model_config(self, server_config_paths, client_config_path=None):
self.bserver_.load_model_config(server_config_paths)
if client_config_path is None:
if isinstance(server_config_paths, dict):
self.is_multi_model_ = True
client_config_path = '{}/serving_server_conf.prototxt'.format(
list(server_config_paths.items())[0][1])
else:
client_config_path = '{}/serving_server_conf.prototxt'.format(
server_config_paths)
self.bclient_config_path_ = client_config_path
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port))
default_port = 12000
self.port_list_ = []
for i in range(1000):
if default_port + i != port and self._port_is_available(default_port
+ i):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port)
def _launch_brpc_service(self, bserver):
bserver.run_server()
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def run_server(self):
p_bserver = Process(
target=self._launch_brpc_service, args=(self.bserver_, ))
p_bserver.start()
options = [('grpc.max_send_message_length', self.body_size_),
('grpc.max_receive_message_length', self.body_size_)]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.worker_num_),
options=options,
maximum_concurrent_rpcs=self.concurrency_)
multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server(
MultiLangServerServiceServicer(
self.bclient_config_path_, self.is_multi_model_,
["0.0.0.0:{}".format(self.port_list_[0])]), server)
server.add_insecure_port('[::]:{}'.format(self.gport_))
server.start()
p_bserver.join()
server.wait_for_termination()
......@@ -37,7 +37,7 @@ def parse_args(): # pylint: disable=doc-string-missing
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
"--model", type=str, default="", nargs="+", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port the server")
parser.add_argument(
......@@ -106,15 +106,24 @@ def start_standard_model(serving_port): # pylint: disable=doc-string-missing
import paddle_serving_server as serving
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = serving.OpSeqMaker()
read_op = op_maker.create('general_reader')
op_seq_maker.add_op(read_op)
for idx, single_model in enumerate(model):
infer_op_name = "general_infer"
if len(model) == 2 and idx == 0:
infer_op_name = "general_YSL"
else:
infer_op_name = "general_infer"
general_infer_op = op_maker.create(infer_op_name)
op_seq_maker.add_op(general_infer_op)
general_response_op = op_maker.create('general_response')
op_seq_maker.add_op(general_response_op)
server = None
if use_multilang:
server = serving.MultiLangServer()
......@@ -167,7 +176,7 @@ class MainService(BaseHTTPRequestHandler):
return (key == cur_key)
def start(self, post_data):
post_data = json.loads(post_data)
post_data = json.loads(post_data.decode('utf-8'))
global p_flag
if not p_flag:
if args.use_encryption_model:
......
......@@ -170,7 +170,7 @@ class MainService(BaseHTTPRequestHandler):
return (key == cur_key)
def start(self, post_data):
post_data = json.loads(post_data)
post_data = json.loads(post_data.decode('utf-8'))
global p_flag
if not p_flag:
if args.use_encryption_model:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册