提交 f0e5a0bd 编写于 作者: B barrierye

update code

上级 28368f7d
...@@ -39,11 +39,10 @@ namespace baidu { ...@@ -39,11 +39,10 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace general_model { namespace general_model {
class PredictorRes { class ModelRes {
public: public:
PredictorRes() {} ModelRes() {}
~PredictorRes() {} ~ModelRes() {}
public: public:
const std::vector<std::vector<int64_t>>& get_int64_by_name( const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string& name) { const std::string& name) {
...@@ -53,14 +52,33 @@ class PredictorRes { ...@@ -53,14 +52,33 @@ class PredictorRes {
const std::string& name) { const std::string& name) {
return _float_map[name]; return _float_map[name];
} }
public:
std::map<std::string, std::vector<std::vector<int64_t>>> _int64_map;
std::map<std::string, std::vector<std::vector<float>>> _float_map;
};
class PredictorRes {
public:
PredictorRes() {}
~PredictorRes() {}
public:
void clear() { _models.clear();}
const std::vector<std::vector<int64_t>>& get_int64_by_name(
const int model_idx, const std::string& name) {
return _models[model_idx].get_int64_by_name(name);
}
const std::vector<std::vector<float>>& get_float_by_name(
const int model_idx, const std::string& name) {
return _models[model_idx].get_float_by_name(name);
}
void set_variant_tag(const std::string& variant_tag) { void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag; _variant_tag = variant_tag;
} }
const std::string& variant_tag() { return _variant_tag; } const std::string& variant_tag() { return _variant_tag; }
int models_num() {return _models.size();}
public: std::vector<ModelRes> _models;
std::map<std::string, std::vector<std::vector<int64_t>>> _int64_map;
std::map<std::string, std::vector<std::vector<float>>> _float_map;
private: private:
std::string _variant_tag; std::string _variant_tag;
......
...@@ -139,8 +139,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed, ...@@ -139,8 +139,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res, PredictorRes &predict_res,
const int &pid) { // NOLINT const int &pid) { // NOLINT
predict_res._int64_map.clear(); predict_res.clear();
predict_res._float_map.clear();
Timer timeline; Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS(); int64_t preprocess_start = timeline.TimeStampUS();
_api.thrd_clear(); _api.thrd_clear();
...@@ -215,30 +214,37 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed, ...@@ -215,30 +214,37 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
VLOG(2) << "predict done."; VLOG(2) << "predict done.";
client_infer_end = timeline.TimeStampUS(); client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
// severaal model output
uint32_t model_num = res.outputs_size();
predict_res._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name]; int idx = _fetch_name_to_idx[name];
VLOG(2) << "fetch name: " << name; VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = res.insts(0).tensor_array(idx).int64_data_size(); int len = output.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len; VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._int64_map[name].resize(1); predict_res._models[m_idx]._int64_map[name].resize(1);
predict_res._int64_map[name][0].resize(len); predict_res._models[m_idx]._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res._int64_map[name][0][i] = predict_res._models[m_idx]._int64_map[name][0][i] =
res.insts(0).tensor_array(idx).int64_data(i); output.insts(0).tensor_array(idx).int64_data(i);
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = res.insts(0).tensor_array(idx).float_data_size(); int len = output.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len; VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len;
predict_res._float_map[name].resize(1); predict_res._models[m_idx]._float_map[name].resize(1);
predict_res._float_map[name][0].resize(len); predict_res._models[m_idx]._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res._float_map[name][0][i] = predict_res._models[m_idx]._float_map[name][0][i] =
res.insts(0).tensor_array(idx).float_data(i); output.insts(0).tensor_array(idx).float_data(i);
}
} }
} }
postprocess_end = timeline.TimeStampUS();
} }
postprocess_end = timeline.TimeStampUS();
} }
if (FLAGS_profile_client) { if (FLAGS_profile_client) {
...@@ -249,7 +255,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed, ...@@ -249,7 +255,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
//TODO: multi-model
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
...@@ -276,8 +282,7 @@ int PredictorClient::batch_predict( ...@@ -276,8 +282,7 @@ int PredictorClient::batch_predict(
const int &pid) { const int &pid) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size()); int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch._int64_map.clear(); predict_res_batch.clear();
predict_res_batch._float_map.clear();
Timer timeline; Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS(); int64_t preprocess_start = timeline.TimeStampUS();
...@@ -294,7 +299,7 @@ int PredictorClient::batch_predict( ...@@ -294,7 +299,7 @@ int PredictorClient::batch_predict(
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
//
for (int bi = 0; bi < batch_size; bi++) { for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi; VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec; std::vector<Tensor *> tensor_vec;
...@@ -371,34 +376,40 @@ int PredictorClient::batch_predict( ...@@ -371,34 +376,40 @@ int PredictorClient::batch_predict(
} else { } else {
client_infer_end = timeline.TimeStampUS(); client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
uint32_t model_num = res.outputs_size();
predict_res_batch._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
predict_res_batch._int64_map[name].resize(batch_size); predict_res_batch._models[m_idx]._int64_map[name].resize(batch_size);
predict_res_batch._float_map[name].resize(batch_size); predict_res_batch._models[m_idx]._float_map[name].resize(batch_size);
} }
for (int bi = 0; bi < batch_size; bi++) { for (int bi = 0; bi < batch_size; bi++) {
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name]; int idx = _fetch_name_to_idx[name];
int len = res.insts(bi).tensor_array(idx).data_size(); int len = output.insts(bi).tensor_array(idx).data_size();
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = res.insts(bi).tensor_array(idx).int64_data_size(); int len = output.insts(bi).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len; VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res_batch._int64_map[name][bi].resize(len); predict_res_batch._models[m_idx]._int64_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx << " first data " VLOG(2) << "fetch name " << name << " index " << idx << " first data "
<< res.insts(bi).tensor_array(idx).int64_data(0); << output.insts(bi).tensor_array(idx).int64_data(0);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res_batch._int64_map[name][bi][i] = predict_res_batch._models[m_idx]._int64_map[name][bi][i] =
res.insts(bi).tensor_array(idx).int64_data(i); output.insts(bi).tensor_array(idx).int64_data(i);
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = res.insts(bi).tensor_array(idx).float_data_size(); int len = output.insts(bi).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name VLOG(2) << "fetch tensor : " << name
<< " type: float32 len : " << len; << " type: float32 len : " << len;
predict_res_batch._float_map[name][bi].resize(len); predict_res_batch._models[m_idx]._float_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx << " first data " VLOG(2) << "fetch name " << name << " index " << idx << " first data "
<< res.insts(bi).tensor_array(idx).float_data(0); << output.insts(bi).tensor_array(idx).float_data(0);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res_batch._float_map[name][bi][i] = predict_res_batch._models[m_idx]._float_map[name][bi][i] =
res.insts(bi).tensor_array(idx).float_data(i); output.insts(bi).tensor_array(idx).float_data(i);
}
} }
} }
} }
...@@ -414,7 +425,7 @@ int PredictorClient::batch_predict( ...@@ -414,7 +425,7 @@ int PredictorClient::batch_predict(
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
//TODO: multi-models
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
......
...@@ -31,17 +31,19 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -31,17 +31,19 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol()) py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init()) .def(py::init())
.def("get_int64_by_name", .def("get_int64_by_name",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_int64_by_name(name); return self.get_int64_by_name(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("get_float_by_name", .def("get_float_by_name",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_float_by_name(name); return self.get_float_by_name(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("variant_tag", .def("variant_tag",
[](PredictorRes &self) { return self.variant_tag(); }); [](PredictorRes &self) { return self.variant_tag(); })
.def("models_num",
[](PredictorRes &self) {return self.models_num(); });
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol()) py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init()) .def(py::init())
......
...@@ -37,40 +37,41 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -37,40 +37,41 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() { int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
//const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); // const GeneralBlob *input_blob =
// get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "try to get output_blob";
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
VLOG(2) << "finish get output_blob"; fprintf(stderr, "[output] blob address %x\n", output_blob);
TensorVector *out = &output_blob->tensor_vector; TensorVector *out = &output_blob->tensor_vector;
VLOG(2) << "finish get *out";
const std::vector<std::string> pre_node_names = pre_names(); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size(); VLOG(2) << "pre node names size: " << pre_node_names.size();
TensorVector input; TensorVector input;
int batch_size = 0; int batch_size = 0;
const GeneralBlob *input_blob; const GeneralBlob *input_blob;
for (int i = 0; i < (int)pre_node_names.size(); ++i) { for (uint32_t i = 0; i < pre_node_names.size(); ++i) {
VLOG(2) << "pre names[" << i << "]: " VLOG(2) << "pre names[" << i << "]: " << pre_node_names[i];
<< pre_node_names[i]; input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]);
input_blob =
get_depend_argument<GeneralBlob>(pre_node_names[i]);
fprintf(stderr, "input blob address %x\n", input_blob);
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_node_names[i]; LOG(ERROR) << "Failed mutable depended argument, op:"
<< pre_node_names[i];
return -1; return -1;
} }
fprintf(stderr, "[input] blob address %x\n", input_blob);
batch_size = input_blob->GetBatchSize(); batch_size = input_blob->GetBatchSize();
VLOG(2) << "batch size of input: " << batch_size; VLOG(2) << "batch size of input: " << batch_size;
for (int j = 0; j < input_blob->tensor_vector.size(); ++j) { for (uint32_t j = 0; j < input_blob->tensor_vector.size(); ++j) {
VLOG(2) << "input tensor[" << j << "]: " VLOG(2) << "input tensor[" << j
<< input_blob->tensor_vector[j].name; << "]: " << input_blob->tensor_vector[j].name;
input.push_back(input_blob->tensor_vector[j]); input.push_back(input_blob->tensor_vector[j]);
VLOG(2) << "add an input tensor name: " << input_blob->tensor_vector[j].name; VLOG(2) << "add an input tensor name: "
<< input_blob->tensor_vector[j].name;
} }
} }
VLOG(2) << "get output blob done.";
const TensorVector *in = &input; const TensorVector *in = &input;
VLOG(2) << "get input done.";
batch_size = 1; batch_size = 1;
VLOG(2) << "infer batch size: " << batch_size; VLOG(2) << "infer batch size: " << batch_size;
...@@ -81,7 +82,7 @@ int GeneralInferOp::inference() { ...@@ -81,7 +82,7 @@ int GeneralInferOp::inference() {
timeline.Start(); timeline.Start();
VLOG(2) << "input of op " << op_name(); VLOG(2) << "input of op " << op_name();
for (int i = 0; i < in->size(); ++i) { for (uint32_t i = 0; i < in->size(); ++i) {
VLOG(2) << in->at(i).name; VLOG(2) << in->at(i).name;
} }
...@@ -94,7 +95,7 @@ int GeneralInferOp::inference() { ...@@ -94,7 +95,7 @@ int GeneralInferOp::inference() {
} }
VLOG(2) << "output of op " << op_name(); VLOG(2) << "output of op " << op_name();
for (int i = 0; i < out->size(); ++i) { for (uint32_t i = 0; i < out->size(); ++i) {
VLOG(2) << out->at(i).name; VLOG(2) << out->at(i).name;
} }
......
...@@ -80,6 +80,7 @@ int GeneralReaderOp::inference() { ...@@ -80,6 +80,7 @@ int GeneralReaderOp::inference() {
std::vector<int64_t> capacity; std::vector<int64_t> capacity;
GeneralBlob *res = mutable_data<GeneralBlob>(); GeneralBlob *res = mutable_data<GeneralBlob>();
fprintf(stderr, "[reader] out blob address %x\n", res);
TensorVector *out = &res->tensor_vector; TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size); res->SetBatchSize(batch_size);
......
...@@ -33,6 +33,7 @@ using baidu::paddle_serving::predictor::general_model::Tensor; ...@@ -33,6 +33,7 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FetchInst; using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
...@@ -40,22 +41,12 @@ int GeneralResponseOp::inference() { ...@@ -40,22 +41,12 @@ int GeneralResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names(); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size(); VLOG(2) << "pre node names size: " << pre_node_names.size();
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_node_names[0]);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0];
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline; Timer timeline;
// double response_time = 0.0; // double resionse_time = 0.0;
// timeline.Start(); // timeline.Start();
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
...@@ -74,11 +65,26 @@ int GeneralResponseOp::inference() { ...@@ -74,11 +65,26 @@ int GeneralResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
} }
// response inst with only fetch_var_names const GeneralBlob *input_blob;
Response *res = mutable_data<Response>(); for (uint32_t i = 0; i < pre_node_names.size(); ++i) {
VLOG(2) << "pre names[" << i << "]: "
<< pre_node_names[i] << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]);
fprintf(stderr, "input(%s) blob address %x\n", pre_node_names[i].c_str(), input_blob);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0];
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
//TODO
ModelOutput *output = res->add_outputs();
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = res->add_insts(); FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) { for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array(); Tensor *tensor = fetch_inst->add_tensor_array();
// currently only response float tensor or lod_tensor // currently only response float tensor or lod_tensor
...@@ -108,7 +114,7 @@ int GeneralResponseOp::inference() { ...@@ -108,7 +114,7 @@ int GeneralResponseOp::inference() {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) { k++) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[k]); fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[k]);
} }
} }
...@@ -117,14 +123,14 @@ int GeneralResponseOp::inference() { ...@@ -117,14 +123,14 @@ int GeneralResponseOp::inference() {
if (var_size == batch_size) { if (var_size == batch_size) {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = j * cap; k < (j + 1) * cap; ++k) { for (int k = j * cap; k < (j + 1) * cap; ++k) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data( fetch_p->mutable_tensor_array(var_idx)->add_int64_data(
data_ptr[k]); data_ptr[k]);
} }
} }
} else { } else {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[0]); fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[0]);
} }
} }
...@@ -136,7 +142,7 @@ int GeneralResponseOp::inference() { ...@@ -136,7 +142,7 @@ int GeneralResponseOp::inference() {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) { k++) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[k]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[k]);
} }
} }
...@@ -145,14 +151,14 @@ int GeneralResponseOp::inference() { ...@@ -145,14 +151,14 @@ int GeneralResponseOp::inference() {
if (var_size == batch_size) { if (var_size == batch_size) {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = j * cap; k < (j + 1) * cap; ++k) { for (int k = j * cap; k < (j + 1) * cap; ++k) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data( fetch_p->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[k]); data_ptr[k]);
} }
} }
} else { } else {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = res->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[0]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[0]);
} }
} }
...@@ -160,12 +166,17 @@ int GeneralResponseOp::inference() { ...@@ -160,12 +166,17 @@ int GeneralResponseOp::inference() {
var_idx++; var_idx++;
} }
} }
}
if (req->profile_server()) { if (req->profile_server()) {
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
for (uint32_t i = 0; i< pre_node_names.size(); ++i) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]);
VLOG(2) << "p size for input blob: " << input_blob->p_size; VLOG(2) << "p size for input blob: " << input_blob->p_size;
ModelOutput* output = res->mutable_outputs(i);
for (int i = 0; i < input_blob->p_size; ++i) { for (int i = 0; i < input_blob->p_size; ++i) {
res->add_profile_time(input_blob->time_stamp[i]); output->add_profile_time(input_blob->time_stamp[i]);
}
} }
// TODO(guru4elephant): find more elegant way to do this // TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start); res->add_profile_time(start);
......
...@@ -39,13 +39,17 @@ int GeneralTextResponseOp::inference() { ...@@ -39,13 +39,17 @@ int GeneralTextResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names(); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size(); VLOG(2) << "pre node names size: " << pre_node_names.size();
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_node_names[0]); const GeneralBlob *input_blob =
get_depend_argument<GeneralBlob>(pre_node_names[0]);
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0]; LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[0];
return -1; return -1;
} }
LOG(ERROR) << "Error!";
return -1;
/*
const TensorVector *in = &input_blob->tensor_vector; const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize(); int batch_size = input_blob->GetBatchSize();
...@@ -127,7 +131,7 @@ int GeneralTextResponseOp::inference() { ...@@ -127,7 +131,7 @@ int GeneralTextResponseOp::inference() {
// TODO(guru4elephant): find more elegant way to do this // TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start); res->add_profile_time(start);
res->add_profile_time(end); res->add_profile_time(end);
} }*/
return 0; return 0;
} }
......
...@@ -39,10 +39,15 @@ message Request { ...@@ -39,10 +39,15 @@ message Request {
}; };
message Response { message Response {
repeated FetchInst insts = 1; repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
}; };
message ModelOutput {
repeated FetchInst insts = 1;
repeated int64 profile_time = 2;
}
service GeneralModelService { service GeneralModelService {
rpc inference(Request) returns (Response); rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response); rpc debug(Request) returns (Response);
......
...@@ -92,7 +92,8 @@ int DagView::init(Dag* dag, const std::string& service_name) { ...@@ -92,7 +92,8 @@ int DagView::init(Dag* dag, const std::string& service_name) {
vnode->op = op; vnode->op = op;
// Add depends // Add depends
for (auto it = vnode->conf->depends.begin(); for (auto it = vnode->conf->depends.begin();
it != vnode->conf->depends.end(); ++it) { it != vnode->conf->depends.end();
++it) {
std::string pre_node_name = it->first; std::string pre_node_name = it->first;
VLOG(2) << "add op pre name: \n" VLOG(2) << "add op pre name: \n"
<< "current op name: " << vnode->op->op_name() << "current op name: " << vnode->op->op_name()
...@@ -103,11 +104,11 @@ int DagView::init(Dag* dag, const std::string& service_name) { ...@@ -103,11 +104,11 @@ int DagView::init(Dag* dag, const std::string& service_name) {
} }
// TODO(guru4elephant): this seems buggy, please review later // TODO(guru4elephant): this seems buggy, please review later
/*if (si > 0) {*/ /*if (si > 0) {*/
//VLOG(2) << "set op pre name: \n" // VLOG(2) << "set op pre name: \n"
//<< "current op name: " << vstage->nodes.back()->op->op_name() //<< "current op name: " << vstage->nodes.back()->op->op_name()
//<< " previous op name: " //<< " previous op name: "
//<< _view[si - 1]->nodes.back()->op->op_name(); //<< _view[si - 1]->nodes.back()->op->op_name();
//vstage->nodes.back()->op->set_pre_node_name( // vstage->nodes.back()->op->set_pre_node_name(
//_view[si - 1]->nodes.back()->op->op_name()); //_view[si - 1]->nodes.back()->op->op_name());
/*}*/ /*}*/
_view.push_back(vstage); _view.push_back(vstage);
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "core/predictor/framework/manager.h" #include "core/predictor/framework/manager.h"
#include "core/predictor/framework/resource.h" #include "core/predictor/framework/resource.h"
#include "core/predictor/framework/service_manager.h" #include "core/predictor/framework/service_manager.h"
#define BLOG(fmt, ...) printf("[%s:%s]:%d "fmt"\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
...@@ -85,6 +86,7 @@ int ServerManager::start_and_wait() { ...@@ -85,6 +86,7 @@ int ServerManager::start_and_wait() {
boost::unordered_map<std::string, Service*>::iterator it; boost::unordered_map<std::string, Service*>::iterator it;
for (it = _format_services.begin(); it != _format_services.end(); it++) { for (it = _format_services.begin(); it != _format_services.end(); it++) {
BLOG("\n\nservice name: %s", it->first.c_str());
if (_server.AddService(it->second, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { if (_server.AddService(it->second, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Failed to add service of format:" << it->first << "!"; LOG(ERROR) << "Failed to add service of format:" << it->first << "!";
return -1; return -1;
......
...@@ -139,6 +139,7 @@ const std::string& InferService::name() const { return _infer_service_format; } ...@@ -139,6 +139,7 @@ const std::string& InferService::name() const { return _infer_service_format; }
int InferService::inference(const google::protobuf::Message* request, int InferService::inference(const google::protobuf::Message* request,
google::protobuf::Message* response, google::protobuf::Message* response,
butil::IOBufBuilder* debug_os) { butil::IOBufBuilder* debug_os) {
BLOG("\n=====> start to inference");
TRACEPRINTF("start to inference"); TRACEPRINTF("start to inference");
// when funtion call begins, framework will reset // when funtion call begins, framework will reset
// thread local variables&resources automatically. // thread local variables&resources automatically.
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#include <string> #include <string>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/predictor_metric.h" // PredictorMetric #include "core/predictor/framework/predictor_metric.h" // PredictorMetric
#define BLOG(fmt, ...) printf("[%s:%s]:%d "fmt"\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__) #define BLOG(fmt, ...) \
printf( \
"[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
......
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include "core/predictor/framework/channel.h" #include "core/predictor/framework/channel.h"
#include "core/predictor/framework/op_repository.h" #include "core/predictor/framework/op_repository.h"
#include "core/predictor/framework/predictor_metric.h" // PredictorMetric #include "core/predictor/framework/predictor_metric.h" // PredictorMetric
#include <cstdlib>
#define BLOG(fmt, ...) printf("[%s:%s]:%d "fmt"\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
#include<stdexcept>
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
...@@ -94,10 +97,6 @@ class Op { ...@@ -94,10 +97,6 @@ class Op {
template <typename T> template <typename T>
T* mutable_data() { T* mutable_data() {
Channel* channel = mutable_channel(); Channel* channel = mutable_channel();
LOG(INFO) << "succ to get channel!";
auto x = (dynamic_cast<OpChannel<T>*>(channel))->data();
LOG(INFO) << "succ to x!";
return x;
return (dynamic_cast<OpChannel<T>*>(channel))->data(); return (dynamic_cast<OpChannel<T>*>(channel))->data();
} }
...@@ -136,7 +135,7 @@ class Op { ...@@ -136,7 +135,7 @@ class Op {
const std::string& full_name() const { return _full_name; } const std::string& full_name() const { return _full_name; }
//const std::string& pre_name() const { return _pre_node_name; } // const std::string& pre_name() const { return _pre_node_name; }
const std::vector<std::string>& pre_names() const { return _pre_node_names; } const std::vector<std::string>& pre_names() const { return _pre_node_names; }
void set_full_name(const std::string full_name) { _full_name = full_name; } void set_full_name(const std::string full_name) { _full_name = full_name; }
...@@ -207,7 +206,7 @@ class Op { ...@@ -207,7 +206,7 @@ class Op {
Bus* _bus; Bus* _bus;
Dag* _dag; Dag* _dag;
uint32_t _id; uint32_t _id;
//std::string _pre_node_name; // only for sequential execution // std::string _pre_node_name; // only for sequential execution
std::vector<std::string> _pre_node_names; // for dag execution std::vector<std::string> _pre_node_names; // for dag execution
std::string _name; std::string _name;
std::string _full_name; // service_workflow_stageindex_opname std::string _full_name; // service_workflow_stageindex_opname
...@@ -231,20 +230,19 @@ class OpWithChannel : public Op { ...@@ -231,20 +230,19 @@ class OpWithChannel : public Op {
// ---------- Implements ---------- // ---------- Implements ----------
Channel* mutable_channel() { Channel* mutable_channel() {
LOG(INFO) << "op->mutable_data";
if (_channel != NULL) { if (_channel != NULL) {
LOG(INFO) << "op->mutable_data: return _channel"; LOG(INFO) << "op->mutable_data: _channel != NULL";
return _channel; return _channel;
} }
LOG(INFO) << "op->mutable_data: _channel == NULL"; LOG(INFO) << "try to get_object: _channel";
_channel = butil::get_object<ChannelType>(); //_channel = butil::get_object<ChannelType>();
//LOG(INFO) << butil::describe_objects<ChannelType>();
_channel = new ChannelType();
if (!_channel) { if (!_channel) {
LOG(INFO) << "op->mutable_data: fail to get _channel";
LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name(); LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name();
return NULL; return NULL;
} }
LOG(INFO) << "op->mutable_data: succ to get _channel";
_channel->init(this->id(), this->name()); _channel->init(this->id(), this->name());
return _channel; return _channel;
} }
...@@ -252,10 +250,15 @@ class OpWithChannel : public Op { ...@@ -252,10 +250,15 @@ class OpWithChannel : public Op {
const Channel* get_channel() const { return _channel; } const Channel* get_channel() const { return _channel; }
int release_channel() { int release_channel() {
LOG(INFO) << "=====> _chaneel deinit";
if (_channel) { if (_channel) {
_channel->deinit(); _channel->deinit();
butil::return_object<ChannelType>(_channel); delete _channel;
} }
/*if (_channel) {*/
//_channel->deinit();
//butil::return_object<ChannelType>(_channel);
/*}*/
_channel = NULL; _channel = NULL;
return 0; return 0;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "core/predictor/framework/server.h" #include "core/predictor/framework/server.h"
#include "core/predictor/framework/service.h" #include "core/predictor/framework/service.h"
#include "core/predictor/framework/workflow.h" #include "core/predictor/framework/workflow.h"
#define BLOG(fmt, ...) printf("[%s:%s]:%d "fmt"\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
using baidu::paddle_serving::predictor::ServerManager; using baidu::paddle_serving::predictor::ServerManager;
using baidu::paddle_serving::predictor::WorkflowManager; using baidu::paddle_serving::predictor::WorkflowManager;
...@@ -217,6 +218,7 @@ int main(int argc, char** argv) { ...@@ -217,6 +218,7 @@ int main(int argc, char** argv) {
FLAGS_stderrthreshold = 3; FLAGS_stderrthreshold = 3;
#endif #endif
BLOG("\nServerManager::instance().start_and_wait()\n");
if (ServerManager::instance().start_and_wait() != 0) { if (ServerManager::instance().start_and_wait() != 0) {
LOG(ERROR) << "Failed start server and wait!"; LOG(ERROR) << "Failed start server and wait!";
return -1; return -1;
......
...@@ -39,10 +39,15 @@ message Request { ...@@ -39,10 +39,15 @@ message Request {
}; };
message Response { message Response {
repeated FetchInst insts = 1; repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
}; };
message ModelOutput {
repeated FetchInst insts = 1;
repeated int64 profile_time = 2;
}
service GeneralModelService { service GeneralModelService {
rpc inference(Request) returns (Response); rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response); rpc debug(Request) returns (Response);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from paddle_serving_client import Client from paddle_serving_client import Client
from imdb_reader import IMDBDataset from imdb_reader import IMDBDataset
import sys import sys
import time
client = Client() client = Client()
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
...@@ -26,12 +27,13 @@ client.connect(["127.0.0.1:9393"]) ...@@ -26,12 +27,13 @@ client.connect(["127.0.0.1:9393"])
imdb_dataset = IMDBDataset() imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab') imdb_dataset.load_resource('imdb.vocab')
for i in range(40): for i in range(500):
line = 'i am very sad | 0' line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line) word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids} feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"] fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch) fetch_map = client.predict(feed=feed, fetch=fetch)
print("{} {}".format(i, fetch_map["prediction"][1])) print("{} {}".format(i, fetch_map["prediction"][1]))
exit(0) # time.sleep(1)
# exit(0)
print('0.633530199528') print('0.633530199528')
...@@ -36,7 +36,8 @@ op_seq_maker.add_op(response_op, dependent_nodes=[add_op]) ...@@ -36,7 +36,8 @@ op_seq_maker.add_op(response_op, dependent_nodes=[add_op])
server = Server() server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
# server.load_model_config(sys.argv[1]) # server.load_model_config(sys.argv[1])
model_configs = {'g1': 'imdb_bow_model', 'g2': 'imdb_cnn_model'} model_configs = {'g1': 'imdb_bow_model', 'g2': 'imdb_bow_model'}
# model_configs = {'g1': 'imdb_bow_model', 'g2': 'imdb_cnn_model'}
server.load_model_config(model_configs) server.load_model_config(model_configs)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu") server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server() server.run_server()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册