提交 ab645d8c 编写于 作者: B barrierye

get engine name in Client

上级 6797faff
......@@ -27,6 +27,9 @@
#include "core/sdk-cpp/general_model_service.pb.h"
#include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/include/predictor_sdk.h"
#define BLOG(fmt, ...) \
printf( \
"[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
......@@ -42,8 +45,19 @@ namespace general_model {
class ModelRes {
public:
ModelRes() {}
ModelRes(const ModelRes& res) {
_engine_name = res._engine_name;
_int64_map.insert(res._int64_map.begin(), res._int64_map.end());
_float_map.insert(res._float_map.begin(), res._float_map.end());
}
ModelRes(ModelRes&& res) {
_engine_name = std::move(res._engine_name);
_int64_map.insert(std::make_move_iterator(std::begin(res._int64_map)),
std::make_move_iterator(std::end(res._int64_map)));
_float_map.insert(std::make_move_iterator(std::begin(res._float_map)),
std::make_move_iterator(std::end(res._float_map)));
}
~ModelRes() {}
public:
const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string& name) {
return _int64_map[name];
......@@ -55,19 +69,18 @@ class ModelRes {
void set_engine_name(const std::string& engine_name) {
_engine_name = engine_name;
}
const std::string& engine_name() {
return engine_name;
}
ModelRes& operator = (ModelRes&& res) {
std::cout << "move ++++++++>";
const std::string& engine_name() { return _engine_name; }
ModelRes& operator=(ModelRes&& res) {
if (this != &res) {
_int64_map = res._int64_map;
_float_map = res._float_map;
res._int64_map = nullptr;
res._float_map = nullptr;
_engine_name = std::move(res._engine_name);
_int64_map.insert(std::make_move_iterator(std::begin(res._int64_map)),
std::make_move_iterator(std::end(res._int64_map)));
_float_map.insert(std::make_move_iterator(std::begin(res._float_map)),
std::make_move_iterator(std::end(res._float_map)));
}
return *this;
}
public:
std::string _engine_name;
std::map<std::string, std::vector<std::vector<int64_t>>> _int64_map;
......@@ -82,7 +95,7 @@ class PredictorRes {
public:
void clear() {
_models.clear();
_engine_names.clear();
_engine_names.clear();
}
const std::vector<std::vector<int64_t>>& get_int64_by_name(
const int model_idx, const std::string& name) {
......@@ -94,16 +107,13 @@ class PredictorRes {
}
void add_model_res(ModelRes&& res) {
_engine_names.push_back(res.engine_name());
_models.emplace_back(res);
_models.emplace_back(std::move(res));
}
void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag;
}
const std::string& variant_tag() { return _variant_tag; }
int model_num() {return _models.size();}
const std::vector<std::string>& get_engine_names() {
return _engine_names;
}
const std::vector<std::string>& get_engine_names() { return _engine_names; }
private:
std::vector<ModelRes> _models;
......
......@@ -219,35 +219,39 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
postprocess_start = client_infer_end;
// multi-model output
uint32_t model_num = res.outputs_size();
predict_res._models.resize(model_num);
// predict_res._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name];
VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) {
int len = output.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._models[m_idx]._int64_map[name].resize(1);
predict_res._models[m_idx]._int64_map[name][0].resize(len);
model._int64_map[name].resize(1);
model._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) {
predict_res._models[m_idx]._int64_map[name][0][i] =
model._int64_map[name][0][i] =
output.insts(0).tensor_array(idx).int64_data(i);
}
} else if (_fetch_name_to_type[name] == 1) {
int len = output.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len;
predict_res._models[m_idx]._float_map[name].resize(1);
predict_res._models[m_idx]._float_map[name][0].resize(len);
VLOG(2) << "fetch tensor : " << name
<< " type: float32 len : " << len;
model._float_map[name].resize(1);
model._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) {
predict_res._models[m_idx]._float_map[name][0][i] =
model._float_map[name][0][i] =
output.insts(0).tensor_array(idx).float_data(i);
}
}
//TODO
// TODO
postprocess_end = timeline.TimeStampUS();
}
predict_res.add_model_res(std::move(model));
}
}
......@@ -259,7 +263,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
//TODO: multi-model
// TODO: multi-model
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
......@@ -303,7 +307,7 @@ int PredictorClient::batch_predict(
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec;
......@@ -382,13 +386,15 @@ int PredictorClient::batch_predict(
postprocess_start = client_infer_end;
uint32_t model_num = res.outputs_size();
predict_res_batch._models.resize(model_num);
// predict_res_batch._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) {
predict_res_batch._models[m_idx]._int64_map[name].resize(batch_size);
predict_res_batch._models[m_idx]._float_map[name].resize(batch_size);
model._int64_map[name].resize(batch_size);
model._float_map[name].resize(batch_size);
}
VLOG(2) << "response batch size " << output.insts_size();
VLOG(2) << "response var nmae " << output.insts(0).tensor_array_size();
......@@ -398,29 +404,33 @@ int PredictorClient::batch_predict(
int len = output.insts(bi).tensor_array(idx).data_size();
if (_fetch_name_to_type[name] == 0) {
int len = output.insts(bi).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res_batch._models[m_idx]._int64_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx << " first data "
VLOG(2) << "fetch tensor : " << name
<< " type: int64 len : " << len;
model._int64_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx
<< " first data "
<< output.insts(bi).tensor_array(idx).int64_data(0);
for (int i = 0; i < len; ++i) {
predict_res_batch._models[m_idx]._int64_map[name][bi][i] =
model._int64_map[name][bi][i] =
output.insts(bi).tensor_array(idx).int64_data(i);
}
} else if (_fetch_name_to_type[name] == 1) {
int len = output.insts(bi).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name
<< " type: float32 len : " << len;
predict_res_batch._models[m_idx]._float_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx << " first data "
model._float_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx
<< " first data "
<< output.insts(bi).tensor_array(idx).float_data(0);
for (int i = 0; i < len; ++i) {
predict_res_batch._models[m_idx]._float_map[name][bi][i] =
model._float_map[name][bi][i] =
output.insts(bi).tensor_array(idx).float_data(i);
}
}
idx += 1;
}
}
predict_res_batch.add_model_res(std::move(model));
postprocess_end = timeline.TimeStampUS();
}
}
......@@ -433,7 +443,7 @@ int PredictorClient::batch_predict(
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
//TODO: multi-models
// TODO: multi-models
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
......
......@@ -40,10 +40,9 @@ PYBIND11_MODULE(serving_client, m) {
return self.get_float_by_name(model_idx, name);
},
py::return_value_policy::reference)
.def("variant_tag",
[](PredictorRes &self) { return self.variant_tag(); })
.def("model_num",
[](PredictorRes &self) {return self.model_num(); });
.def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); })
.def("get_engine_names",
[](PredictorRes &self) { return self.get_engine_names(); });
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init())
......
......@@ -37,8 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name() <<") can only have one predecessor op, but received " << pre_node_names.size();
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
......@@ -65,8 +68,9 @@ int GeneralInferOp::inference() {
int64_t start = timeline.TimeStampUS();
timeline.Start();
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
if (InferManager::instance().infer(
engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str();
return -1;
}
......
......@@ -40,7 +40,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
......@@ -67,13 +67,14 @@ int GeneralResponseOp::inference() {
const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
VLOG(2) << "pre names[" << pi << "]: "
<< pre_node_names[pi] << " ("
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
fprintf(stderr, "input(%s) blob address %x\n", pre_node_names[pi].c_str(), input_blob);
input_blob = get_depend_argument<GeneralBlob>(pre_name);
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[pi];
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1;
}
......@@ -82,6 +83,8 @@ int GeneralResponseOp::inference() {
VLOG(2) << "input batch size: " << batch_size;
ModelOutput *output = res->add_outputs();
output->set_engine_name(
pre_name); // To get the order of model return values
for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) {
......@@ -114,7 +117,8 @@ int GeneralResponseOp::inference() {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) {
FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[k]);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(
data_ptr[k]);
}
}
} else {
......@@ -130,7 +134,8 @@ int GeneralResponseOp::inference() {
} else {
for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[0]);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(
data_ptr[0]);
}
}
}
......@@ -142,7 +147,8 @@ int GeneralResponseOp::inference() {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) {
FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[k]);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[k]);
}
}
} else {
......@@ -158,7 +164,8 @@ int GeneralResponseOp::inference() {
} else {
for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[0]);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[0]);
}
}
}
......@@ -169,10 +176,10 @@ int GeneralResponseOp::inference() {
if (req->profile_server()) {
int64_t end = timeline.TimeStampUS();
for (uint32_t i = 0; i< pre_node_names.size(); ++i) {
for (uint32_t i = 0; i < pre_node_names.size(); ++i) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]);
VLOG(2) << "p size for input blob: " << input_blob->p_size;
ModelOutput* output = res->mutable_outputs(i);
ModelOutput *output = res->mutable_outputs(i);
for (int i = 0; i < input_blob->p_size; ++i) {
output->add_profile_time(input_blob->time_stamp[i]);
}
......
......@@ -37,9 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralTextResponseOp::inference() {
VLOG(2) << "Going to run inference";
//TODO: multi-predecessor
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name() <<") can only have one predecessor op, but received " << pre_node_names.size();
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
......@@ -51,6 +53,8 @@ int GeneralTextResponseOp::inference() {
return -1;
}
// TODO: multi-predecessor
/*
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
......@@ -133,7 +137,7 @@ int GeneralTextResponseOp::inference() {
res->add_profile_time(start);
res->add_profile_time(end);
}
*/
return 0;
}
DEFINE_OP(GeneralTextResponseOp);
......
......@@ -46,6 +46,7 @@ message Response {
message ModelOutput {
repeated FetchInst insts = 1;
repeated int64 profile_time = 2;
optional string engine_name = 3;
}
service GeneralModelService {
......
......@@ -46,6 +46,7 @@ message Response {
message ModelOutput {
repeated FetchInst insts = 1;
repeated int64 profile_time = 2;
optional string engine_name = 3;
}
service GeneralModelService {
......
......@@ -247,8 +247,8 @@ class Client(object):
return None
multi_result_map_batch = []
model_num = result_batch.model_num()
for mi in range(model_num):
model_engine_names = result_batch.get_engine_names()
for mi, engine_name in enumerate(model_engine_names):
result_map_batch = []
result_map = {}
for i, name in enumerate(fetch_names):
......@@ -263,21 +263,23 @@ class Client(object):
result_map_batch.append(single_result)
multi_result_map_batch.append(result_map_batch)
if model_num == 1:
ret = None
if len(model_engine_names) == 1:
if batch_size == 1:
return [multi_result_map_batch[0][0], self.result_handle_.variant_tag()
] if need_variant_tag else multi_result_map_batch[0][0]
ret = multi_result_map_batch[0][0]
else:
return [multi_result_map_batch[0], self.result_handle_.variant_tag()
] if need_variant_tag else multi_result_map_batch[0]
ret = multi_result_map_batch[0]
else:
ret = {}
if batch_size == 1:
multi_result_map = [result_map_batch[0] for result_map_batch in multi_result_map_batch]
return [multi_result_map, self.result_handle_.variant_tag()
] if need_variant_tag else multi_result_map
for mi, result_map_batch in enumerate(multi_result_map_batch):
ret[model_engine_names[mi]] = result_map_batch[0]
else:
return [multi_result_map_batch, self.result_handle_.variant_tag()
] if need_variant_tag else multi_result_map_batch
for mi, result_map_batch in enumerate(multi_result_map_batch):
ret[model_engine_names[mi]] = result_map_batch
return [ret, self.result_handle_.variant_tag()
] if need_variant_tag else ret
def release(self):
self.client_handle_.destroy_predictor()
self.client_handle_ = None
......@@ -21,6 +21,7 @@ import socket
import paddle_serving_server as paddle_serving_server
from .version import serving_server_version
from contextlib import closing
import collections
class OpMaker(object):
......@@ -36,18 +37,37 @@ class OpMaker(object):
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary
def create(self, node_type, node_name=None, inputs=[], outputs=[]):
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
node.name = node_name if node_name is not None else "{}_op".format(
node_type)
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
return node
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
......@@ -56,19 +76,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node, dependent_nodes=None):
if dependent_nodes is None:
if len(self.workflow.nodes) >= 1:
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
else:
for dep_node in dependent_nodes:
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
......@@ -77,6 +103,24 @@ class OpSeqMaker(object):
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object):
def __init__(self):
self.server_handle_ = None
......@@ -100,7 +144,7 @@ class Server(object):
self.cur_path = os.getcwd()
self.use_local_bin = False
self.mkl_flag = False
self.model_config_paths = None
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
......@@ -117,6 +161,9 @@ class Server(object):
def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False):
self.memory_optimization = flag
......@@ -129,12 +176,6 @@ class Server(object):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if isinstance(model_config_paths, str):
model_config_paths = {"general_infer_op": model_config_paths}
elif not isinstance(model_config_paths, dict):
raise Exception("model_config_paths can not be {}".format(
type(model_config_paths)))
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
......@@ -188,11 +229,33 @@ class Server(object):
fout.write(str(pb_obj))
def load_model_config(self, model_config_paths):
self.model_config_paths = model_config_paths
path = model_config_paths.items()[0][1]
self.model_config_path = path
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# the default engine name is "general_infer"
self.model_config_paths = {"general_infer_0": model_config_paths}
workflow_oi_config_path = self.model_config_paths["general_infer_0"]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r')
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册