提交 05782454 编写于 作者: B barrierye

Merge branch 'pipeline-auto-batch' of https://github.com/barrierye/Serving into pipeline-auto-batch

...@@ -37,6 +37,7 @@ message InferenceRequest { ...@@ -37,6 +37,7 @@ message InferenceRequest {
repeated string feed_var_names = 2; repeated string feed_var_names = 2;
repeated string fetch_var_names = 3; repeated string fetch_var_names = 3;
required bool is_python = 4 [ default = false ]; required bool is_python = 4 [ default = false ];
required uint64 log_id = 5 [ default = 0 ];
}; };
message InferenceResponse { message InferenceResponse {
......
...@@ -227,7 +227,8 @@ class PredictorClient { ...@@ -227,7 +227,8 @@ class PredictorClient {
const std::vector<std::vector<int>>& int_shape, const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT PredictorRes& predict_res_batch, // NOLINT
const int& pid); const int& pid,
const uint64_t log_id);
int numpy_predict( int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch, const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
...@@ -238,7 +239,8 @@ class PredictorClient { ...@@ -238,7 +239,8 @@ class PredictorClient {
const std::vector<std::vector<int>>& int_shape, const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT PredictorRes& predict_res_batch, // NOLINT
const int& pid); const int& pid,
const uint64_t log_id);
private: private:
PredictorApi _api; PredictorApi _api;
......
...@@ -144,7 +144,8 @@ int PredictorClient::batch_predict( ...@@ -144,7 +144,8 @@ int PredictorClient::batch_predict(
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid) { const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size()); int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear(); predict_res_batch.clear();
...@@ -162,6 +163,7 @@ int PredictorClient::batch_predict( ...@@ -162,6 +163,7 @@ int PredictorClient::batch_predict(
VLOG(2) << "int feed name size: " << int_feed_name.size(); VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req; Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
...@@ -356,7 +358,8 @@ int PredictorClient::numpy_predict( ...@@ -356,7 +358,8 @@ int PredictorClient::numpy_predict(
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid) { const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size()); int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
VLOG(2) << "batch size: " << batch_size; VLOG(2) << "batch size: " << batch_size;
predict_res_batch.clear(); predict_res_batch.clear();
...@@ -374,6 +377,7 @@ int PredictorClient::numpy_predict( ...@@ -374,6 +377,7 @@ int PredictorClient::numpy_predict(
VLOG(2) << "int feed name size: " << int_feed_name.size(); VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req; Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
......
...@@ -107,7 +107,8 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -107,7 +107,8 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid) { const int &pid,
const uint64_t log_id) {
return self.batch_predict(float_feed_batch, return self.batch_predict(float_feed_batch,
float_feed_name, float_feed_name,
float_shape, float_shape,
...@@ -116,7 +117,8 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -116,7 +117,8 @@ PYBIND11_MODULE(serving_client, m) {
int_shape, int_shape,
fetch_name, fetch_name,
predict_res_batch, predict_res_batch,
pid); pid,
log_id);
}, },
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("numpy_predict", .def("numpy_predict",
...@@ -131,7 +133,8 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -131,7 +133,8 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid) { const int &pid,
const uint64_t log_id) {
return self.numpy_predict(float_feed_batch, return self.numpy_predict(float_feed_batch,
float_feed_name, float_feed_name,
float_shape, float_shape,
...@@ -140,7 +143,8 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -140,7 +143,8 @@ PYBIND11_MODULE(serving_client, m) {
int_shape, int_shape,
fetch_name, fetch_name,
predict_res_batch, predict_res_batch,
pid); pid,
log_id);
}, },
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
} }
......
...@@ -35,6 +35,7 @@ struct GeneralBlob { ...@@ -35,6 +35,7 @@ struct GeneralBlob {
std::vector<paddle::PaddleTensor> tensor_vector; std::vector<paddle::PaddleTensor> tensor_vector;
int64_t time_stamp[20]; int64_t time_stamp[20];
int p_size = 0; int p_size = 0;
uint64_t _log_id = -1; // for logging
int _batch_size; int _batch_size;
...@@ -46,9 +47,11 @@ struct GeneralBlob { ...@@ -46,9 +47,11 @@ struct GeneralBlob {
tensor_vector.clear(); tensor_vector.clear();
} }
int SetBatchSize(int batch_size) { _batch_size = batch_size; } void SetBatchSize(int batch_size) { _batch_size = batch_size; }
void SetLogId(uint64_t log_id) { _log_id = log_id; }
int GetBatchSize() const { return _batch_size; } int GetBatchSize() const { return _batch_size; }
uint64_t GetLogId() const { return _log_id; }
std::string ShortDebugString() const { return "Not implemented!"; } std::string ShortDebugString() const { return "Not implemented!"; }
}; };
......
...@@ -47,22 +47,25 @@ int GeneralInferOp::inference() { ...@@ -47,22 +47,25 @@ int GeneralInferOp::inference() {
const std::string pre_name = pre_node_names[0]; const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name); const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name; uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
output_blob->SetLogId(log_id);
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name; LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op:" << pre_name;
return -1; return -1;
} }
const TensorVector *in = &input_blob->tensor_vector; const TensorVector *in = &input_blob->tensor_vector;
TensorVector *out = &output_blob->tensor_vector; TensorVector *out = &output_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize(); int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size; VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
output_blob->SetBatchSize(batch_size); output_blob->SetBatchSize(batch_size);
VLOG(2) << "infer batch size: " << batch_size; VLOG(2) << "(logid=" << log_id << ") infer batch size: " << batch_size;
Timer timeline; Timer timeline;
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
...@@ -70,7 +73,8 @@ int GeneralInferOp::inference() { ...@@ -70,7 +73,8 @@ int GeneralInferOp::inference() {
if (InferManager::instance().infer( if (InferManager::instance().infer(
engine_name().c_str(), in, out, batch_size)) { engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str(); LOG(ERROR) << "(logid=" << log_id
<< ") Failed do infer in fluid model: " << engine_name().c_str();
return -1; return -1;
} }
......
...@@ -72,6 +72,7 @@ int conf_check(const Request *req, ...@@ -72,6 +72,7 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() { int GeneralReaderOp::inference() {
// reade request from client // reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id();
int batch_size = req->insts_size(); int batch_size = req->insts_size();
int input_var_num = 0; int input_var_num = 0;
...@@ -83,25 +84,28 @@ int GeneralReaderOp::inference() { ...@@ -83,25 +84,28 @@ int GeneralReaderOp::inference() {
TensorVector *out = &res->tensor_vector; TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size); res->SetBatchSize(batch_size);
res->SetLogId(log_id);
if (!res) { if (!res) {
LOG(ERROR) << "Failed get op tls reader object output"; LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op tls reader object output";
} }
Timer timeline; Timer timeline;
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size(); int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "var num: " << var_num; VLOG(2) << "(logid=" << log_id << ") var num: " << var_num;
VLOG(2) << "start to call load general model_conf op"; VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance(); baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "get resource pointer done."; VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config = std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config(); resource.get_general_model_config();
VLOG(2) << "print general model config done."; VLOG(2) << "(logid=" << log_id << ") print general model config done.";
// TODO(guru4elephant): how to do conditional check? // TODO(guru4elephant): how to do conditional check?
/* /*
...@@ -122,7 +126,8 @@ int GeneralReaderOp::inference() { ...@@ -122,7 +126,8 @@ int GeneralReaderOp::inference() {
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor; paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type(); elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i]; VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64 if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t); elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64; lod_tensor.dtype = paddle::PaddleDType::INT64;
...@@ -137,17 +142,19 @@ int GeneralReaderOp::inference() { ...@@ -137,17 +142,19 @@ int GeneralReaderOp::inference() {
if (model_config->_is_lod_feed[i]) { if (model_config->_is_lod_feed[i]) {
lod_tensor.lod.resize(1); lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0); lod_tensor.lod[0].push_back(0);
VLOG(2) << "var[" << i << "] is lod_tensor"; VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
} else { } else {
lod_tensor.shape.push_back(batch_size); lod_tensor.shape.push_back(batch_size);
capacity[i] = 1; capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) { for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k); int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "shape for var[" << i << "]: " << dim; VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim; capacity[i] *= dim;
lod_tensor.shape.push_back(dim); lod_tensor.shape.push_back(dim);
} }
VLOG(2) << "var[" << i << "] is tensor, capacity: " << capacity[i]; VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
} }
lod_tensor.name = model_config->_feed_name[i]; lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor); out->push_back(lod_tensor);
...@@ -167,11 +174,12 @@ int GeneralReaderOp::inference() { ...@@ -167,11 +174,12 @@ int GeneralReaderOp::inference() {
} else if (tensor.int_data_size() > 0) { } else if (tensor.int_data_size() > 0) {
data_len = tensor.int_data_size(); data_len = tensor.int_data_size();
} }
VLOG(2) << "tensor size for var[" << i << "]: " << data_len; VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
tensor_size += data_len; tensor_size += data_len;
int cur_len = out->at(i).lod[0].back(); int cur_len = out->at(i).lod[0].back();
VLOG(2) << "current len: " << cur_len; VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0; int sample_len = 0;
if (tensor.shape_size() == 1) { if (tensor.shape_size() == 1) {
...@@ -180,7 +188,7 @@ int GeneralReaderOp::inference() { ...@@ -180,7 +188,7 @@ int GeneralReaderOp::inference() {
sample_len = tensor.shape(0); sample_len = tensor.shape(0);
} }
out->at(i).lod[0].push_back(cur_len + sample_len); out->at(i).lod[0].push_back(cur_len + sample_len);
VLOG(2) << "new len: " << cur_len + sample_len; VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
} }
out->at(i).data.Resize(tensor_size * elem_size[i]); out->at(i).data.Resize(tensor_size * elem_size[i]);
out->at(i).shape = {out->at(i).lod[0].back()}; out->at(i).shape = {out->at(i).lod[0].back()};
...@@ -190,11 +198,11 @@ int GeneralReaderOp::inference() { ...@@ -190,11 +198,11 @@ int GeneralReaderOp::inference() {
if (out->at(i).shape.size() == 1) { if (out->at(i).shape.size() == 1) {
out->at(i).shape.push_back(1); out->at(i).shape.push_back(1);
} }
VLOG(2) << "var[" << i VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back(); << "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else { } else {
out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]); out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
VLOG(2) << "var[" << i VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor and capacity=" << batch_size * capacity[i]; << "] is tensor and capacity=" << batch_size * capacity[i];
} }
} }
...@@ -203,8 +211,8 @@ int GeneralReaderOp::inference() { ...@@ -203,8 +211,8 @@ int GeneralReaderOp::inference() {
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == 0) { if (elem_type[i] == 0) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data()); int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is " VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< req->insts(0).tensor_array(i).int64_data(0); << "] is " << req->insts(0).tensor_array(i).int64_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int64_data_size(); int elem_num = req->insts(j).tensor_array(i).int64_data_size();
...@@ -219,8 +227,8 @@ int GeneralReaderOp::inference() { ...@@ -219,8 +227,8 @@ int GeneralReaderOp::inference() {
} }
} else if (elem_type[i] == 1) { } else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data()); float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is " VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< req->insts(0).tensor_array(i).float_data(0); << "] is " << req->insts(0).tensor_array(i).float_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).float_data_size(); int elem_num = req->insts(j).tensor_array(i).float_data_size();
...@@ -235,8 +243,8 @@ int GeneralReaderOp::inference() { ...@@ -235,8 +243,8 @@ int GeneralReaderOp::inference() {
} }
} else if (elem_type[i] == 2) { } else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data()); int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is " VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< req->insts(0).tensor_array(i).int_data(0); << "] is " << req->insts(0).tensor_array(i).int_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int_data_size(); int elem_num = req->insts(j).tensor_array(i).int_data_size();
...@@ -252,7 +260,7 @@ int GeneralReaderOp::inference() { ...@@ -252,7 +260,7 @@ int GeneralReaderOp::inference() {
} }
} }
VLOG(2) << "output size: " << out->size(); VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
timeline.Pause(); timeline.Pause();
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
...@@ -260,7 +268,7 @@ int GeneralReaderOp::inference() { ...@@ -260,7 +268,7 @@ int GeneralReaderOp::inference() {
AddBlobInfo(res, start); AddBlobInfo(res, start);
AddBlobInfo(res, end); AddBlobInfo(res, end);
VLOG(2) << "read data from client success"; VLOG(2) << "(logid=" << log_id << ") read data from client success";
return 0; return 0;
} }
DEFINE_OP(GeneralReaderOp); DEFINE_OP(GeneralReaderOp);
......
...@@ -75,10 +75,12 @@ int GeneralResponseOp::inference() { ...@@ -75,10 +75,12 @@ int GeneralResponseOp::inference() {
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " (" VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")"; << pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name); input_blob = get_depend_argument<GeneralBlob>(pre_name);
uint64_t curr_logid = input_blob->GetLogId();
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(), // fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob); // input_blob);
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name; LOG(ERROR) << "(logid=" << curr_logid
<< ") Failed mutable depended argument, op: " << pre_name;
return -1; return -1;
} }
...@@ -92,17 +94,19 @@ int GeneralResponseOp::inference() { ...@@ -92,17 +94,19 @@ int GeneralResponseOp::inference() {
for (auto &idx : fetch_index) { for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array(); Tensor *tensor = fetch_inst->add_tensor_array();
if (model_config->_is_lod_fetch[idx]) { if (model_config->_is_lod_fetch[idx]) {
VLOG(2) << "out[" << idx << "] " << model_config->_fetch_name[idx] VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
<< " is lod_tensor"; << model_config->_fetch_name[idx] << " is lod_tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) { for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]); tensor->add_shape(in->at(idx).shape[k]);
} }
} else { } else {
VLOG(2) << "out[" << idx << "] " << model_config->_fetch_name[idx] VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
<< " is tensor"; << model_config->_fetch_name[idx] << " is tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) { for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]); tensor->add_shape(in->at(idx).shape[k]);
} }
} }
...@@ -119,8 +123,8 @@ int GeneralResponseOp::inference() { ...@@ -119,8 +123,8 @@ int GeneralResponseOp::inference() {
auto dtype = in->at(idx).dtype; auto dtype = in->at(idx).dtype;
if (dtype == paddle::PaddleDType::INT64) { if (dtype == paddle::PaddleDType::INT64) {
VLOG(2) << "Prepare int64 var [" << model_config->_fetch_name[idx] VLOG(2) << "(logid=" << curr_logid << ") Prepare int64 var ["
<< "]."; << model_config->_fetch_name[idx] << "].";
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data()); int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
// from // from
// https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy // https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy
...@@ -130,16 +134,16 @@ int GeneralResponseOp::inference() { ...@@ -130,16 +134,16 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap( fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap(
&tmp_data); &tmp_data);
} else if (dtype == paddle::PaddleDType::FLOAT32) { } else if (dtype == paddle::PaddleDType::FLOAT32) {
VLOG(2) << "Prepare float var [" << model_config->_fetch_name[idx] VLOG(2) << "(logid=" << curr_logid << ") Prepare float var ["
<< "]."; << model_config->_fetch_name[idx] << "].";
float *data_ptr = static_cast<float *>(in->at(idx).data.data()); float *data_ptr = static_cast<float *>(in->at(idx).data.data());
google::protobuf::RepeatedField<float> tmp_data(data_ptr, google::protobuf::RepeatedField<float> tmp_data(data_ptr,
data_ptr + cap); data_ptr + cap);
fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap( fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap(
&tmp_data); &tmp_data);
} else if (dtype == paddle::PaddleDType::INT32) { } else if (dtype == paddle::PaddleDType::INT32) {
VLOG(2) << "Prepare int32 var [" << model_config->_fetch_name[idx] VLOG(2) << "(logid=" << curr_logid << ")Prepare int32 var ["
<< "]."; << model_config->_fetch_name[idx] << "].";
int32_t *data_ptr = static_cast<int32_t *>(in->at(idx).data.data()); int32_t *data_ptr = static_cast<int32_t *>(in->at(idx).data.data());
google::protobuf::RepeatedField<int32_t> tmp_data(data_ptr, google::protobuf::RepeatedField<int32_t> tmp_data(data_ptr,
data_ptr + cap); data_ptr + cap);
...@@ -154,7 +158,8 @@ int GeneralResponseOp::inference() { ...@@ -154,7 +158,8 @@ int GeneralResponseOp::inference() {
} }
} }
VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; VLOG(2) << "(logid=" << curr_logid << ") fetch var ["
<< model_config->_fetch_name[idx] << "] ready";
var_idx++; var_idx++;
} }
} }
...@@ -167,7 +172,9 @@ int GeneralResponseOp::inference() { ...@@ -167,7 +172,9 @@ int GeneralResponseOp::inference() {
// a more elegant way. // a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]); input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size; uint64_t curr_logid = input_blob->GetLogId();
VLOG(2) << "(logid=" << curr_logid
<< ") p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1; int profile_time_idx = -1;
if (pi == 0) { if (pi == 0) {
profile_time_idx = 0; profile_time_idx = 0;
......
...@@ -37,6 +37,7 @@ message Request { ...@@ -37,6 +37,7 @@ message Request {
repeated FeedInst insts = 1; repeated FeedInst insts = 1;
repeated string fetch_var_names = 2; repeated string fetch_var_names = 2;
optional bool profile_server = 3 [ default = false ]; optional bool profile_server = 3 [ default = false ];
required uint64 log_id = 4 [ default = 0 ];
}; };
message Response { message Response {
......
...@@ -21,6 +21,7 @@ option cc_generic_services = true; ...@@ -21,6 +21,7 @@ option cc_generic_services = true;
message RequestAndResponse { message RequestAndResponse {
required int32 a = 1; required int32 a = 1;
required float b = 2; required float b = 2;
required uint64 log_id = 3 [ default = 0 ];
}; };
service LoadGeneralModelService { service LoadGeneralModelService {
......
...@@ -280,6 +280,7 @@ class PdsCodeGenerator : public CodeGenerator { ...@@ -280,6 +280,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n" " baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n" " baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n" " static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n" " ::baidu::paddle_serving::predictor::InferService* svr = \n"
" " " "
"::baidu::paddle_serving::predictor::InferServiceManager::instance(" "::baidu::paddle_serving::predictor::InferServiceManager::instance("
...@@ -317,6 +318,7 @@ class PdsCodeGenerator : public CodeGenerator { ...@@ -317,6 +318,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n" " baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n" " baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n" " static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n" " ::baidu::paddle_serving::predictor::InferService* svr = \n"
" " " "
"::baidu::paddle_serving::predictor::InferServiceManager::instance(" "::baidu::paddle_serving::predictor::InferServiceManager::instance("
...@@ -1011,6 +1013,7 @@ class PdsCodeGenerator : public CodeGenerator { ...@@ -1011,6 +1013,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n" " brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n" " brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n" " static_cast<brpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n" " ::baidu::paddle_serving::predictor::InferService* svr = \n"
" " " "
"::baidu::paddle_serving::predictor::InferServiceManager::instance(" "::baidu::paddle_serving::predictor::InferServiceManager::instance("
...@@ -1048,6 +1051,7 @@ class PdsCodeGenerator : public CodeGenerator { ...@@ -1048,6 +1051,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n" " brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n" " brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n" " static_cast<brpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n" " ::baidu::paddle_serving::predictor::InferService* svr = \n"
" " " "
"::baidu::paddle_serving::predictor::InferServiceManager::instance(" "::baidu::paddle_serving::predictor::InferServiceManager::instance("
......
...@@ -37,6 +37,7 @@ message Request { ...@@ -37,6 +37,7 @@ message Request {
repeated FeedInst insts = 1; repeated FeedInst insts = 1;
repeated string fetch_var_names = 2; repeated string fetch_var_names = 2;
optional bool profile_server = 3 [ default = false ]; optional bool profile_server = 3 [ default = false ];
required uint64 log_id = 4 [ default = 0 ];
}; };
message Response { message Response {
......
...@@ -192,14 +192,16 @@ public class Client { ...@@ -192,14 +192,16 @@ public class Client {
private InferenceRequest _packInferenceRequest( private InferenceRequest _packInferenceRequest(
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) throws IllegalArgumentException { Iterable<String> fetch,
long log_id) throws IllegalArgumentException {
List<String> feed_var_names = new ArrayList<String>(); List<String> feed_var_names = new ArrayList<String>();
feed_var_names.addAll(feed_batch.get(0).keySet()); feed_var_names.addAll(feed_batch.get(0).keySet());
InferenceRequest.Builder req_builder = InferenceRequest.newBuilder() InferenceRequest.Builder req_builder = InferenceRequest.newBuilder()
.addAllFeedVarNames(feed_var_names) .addAllFeedVarNames(feed_var_names)
.addAllFetchVarNames(fetch) .addAllFetchVarNames(fetch)
.setIsPython(false); .setIsPython(false)
.setLogId(log_id);
for (HashMap<String, INDArray> feed_data: feed_batch) { for (HashMap<String, INDArray> feed_data: feed_batch) {
FeedInst.Builder inst_builder = FeedInst.newBuilder(); FeedInst.Builder inst_builder = FeedInst.newBuilder();
for (String name: feed_var_names) { for (String name: feed_var_names) {
...@@ -332,76 +334,151 @@ public class Client { ...@@ -332,76 +334,151 @@ public class Client {
public Map<String, INDArray> predict( public Map<String, INDArray> predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch) { Iterable<String> fetch) {
return predict(feed, fetch, false); return predict(feed, fetch, false, 0);
}
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return predict(feed, fetch, false, log_id);
} }
public Map<String, HashMap<String, INDArray>> ensemble_predict( public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch) { Iterable<String> fetch) {
return ensemble_predict(feed, fetch, false); return ensemble_predict(feed, fetch, false, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return ensemble_predict(feed, fetch, false, log_id);
} }
public PredictFuture asyn_predict( public PredictFuture asyn_predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch) { Iterable<String> fetch) {
return asyn_predict(feed, fetch, false); return asyn_predict(feed, fetch, false, 0);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return asyn_predict(feed, fetch, false, log_id);
} }
public Map<String, INDArray> predict( public Map<String, INDArray> predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
return predict(feed, fetch, need_variant_tag, 0);
}
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>(); = new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed); feed_batch.add(feed);
return predict(feed_batch, fetch, need_variant_tag); return predict(feed_batch, fetch, need_variant_tag, log_id);
} }
public Map<String, HashMap<String, INDArray>> ensemble_predict( public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
return ensemble_predict(feed, fetch, need_variant_tag, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>(); = new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed); feed_batch.add(feed);
return ensemble_predict(feed_batch, fetch, need_variant_tag); return ensemble_predict(feed_batch, fetch, need_variant_tag, log_id);
} }
public PredictFuture asyn_predict( public PredictFuture asyn_predict(
HashMap<String, INDArray> feed, HashMap<String, INDArray> feed,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
return asyn_predict(feed, fetch, need_variant_tag, 0);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>(); = new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed); feed_batch.add(feed);
return asyn_predict(feed_batch, fetch, need_variant_tag); return asyn_predict(feed_batch, fetch, need_variant_tag, log_id);
} }
public Map<String, INDArray> predict( public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) { Iterable<String> fetch) {
return predict(feed_batch, fetch, false); return predict(feed_batch, fetch, false, 0);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return predict(feed_batch, fetch, false, log_id);
} }
public Map<String, HashMap<String, INDArray>> ensemble_predict( public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) { Iterable<String> fetch) {
return ensemble_predict(feed_batch, fetch, false); return ensemble_predict(feed_batch, fetch, false, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return ensemble_predict(feed_batch, fetch, false, log_id);
} }
public PredictFuture asyn_predict( public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) { Iterable<String> fetch) {
return asyn_predict(feed_batch, fetch, false); return asyn_predict(feed_batch, fetch, false, 0);
}
public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return asyn_predict(feed_batch, fetch, false, log_id);
} }
public Map<String, INDArray> predict( public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
return predict(feed_batch, fetch, need_variant_tag, 0);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
try { try {
profiler_.record("java_prepro_0"); profiler_.record("java_prepro_0");
InferenceRequest req = _packInferenceRequest(feed_batch, fetch); InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
profiler_.record("java_prepro_1"); profiler_.record("java_prepro_1");
profiler_.record("java_client_infer_0"); profiler_.record("java_client_infer_0");
...@@ -415,7 +492,7 @@ public class Client { ...@@ -415,7 +492,7 @@ public class Client {
= new ArrayList<Map.Entry<String, HashMap<String, INDArray>>>( = new ArrayList<Map.Entry<String, HashMap<String, INDArray>>>(
ensemble_result.entrySet()); ensemble_result.entrySet());
if (list.size() != 1) { if (list.size() != 1) {
System.out.format("predict failed: please use ensemble_predict impl.\n"); System.out.format("Failed to predict: please use ensemble_predict impl.\n");
return null; return null;
} }
profiler_.record("java_postpro_1"); profiler_.record("java_postpro_1");
...@@ -423,7 +500,7 @@ public class Client { ...@@ -423,7 +500,7 @@ public class Client {
return list.get(0).getValue(); return list.get(0).getValue();
} catch (StatusRuntimeException e) { } catch (StatusRuntimeException e) {
System.out.format("predict failed: %s\n", e.toString()); System.out.format("Failed to predict: %s\n", e.toString());
return null; return null;
} }
} }
...@@ -432,9 +509,18 @@ public class Client { ...@@ -432,9 +509,18 @@ public class Client {
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
return ensemble_predict(feed_batch, fetch, need_variant_tag, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
try { try {
profiler_.record("java_prepro_0"); profiler_.record("java_prepro_0");
InferenceRequest req = _packInferenceRequest(feed_batch, fetch); InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
profiler_.record("java_prepro_1"); profiler_.record("java_prepro_1");
profiler_.record("java_client_infer_0"); profiler_.record("java_client_infer_0");
...@@ -449,7 +535,7 @@ public class Client { ...@@ -449,7 +535,7 @@ public class Client {
return ensemble_result; return ensemble_result;
} catch (StatusRuntimeException e) { } catch (StatusRuntimeException e) {
System.out.format("predict failed: %s\n", e.toString()); System.out.format("Failed to predict: %s\n", e.toString());
return null; return null;
} }
} }
...@@ -458,7 +544,16 @@ public class Client { ...@@ -458,7 +544,16 @@ public class Client {
List<HashMap<String, INDArray>> feed_batch, List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch, Iterable<String> fetch,
Boolean need_variant_tag) { Boolean need_variant_tag) {
InferenceRequest req = _packInferenceRequest(feed_batch, fetch); return asyn_predict(feed_batch, fetch, need_variant_tag, 0);
}
public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
ListenableFuture<InferenceResponse> future = futureStub_.inference(req); ListenableFuture<InferenceResponse> future = futureStub_.inference(req);
PredictFuture predict_future = new PredictFuture(future, PredictFuture predict_future = new PredictFuture(future,
(InferenceResponse resp) -> { (InferenceResponse resp) -> {
......
...@@ -37,6 +37,7 @@ message InferenceRequest { ...@@ -37,6 +37,7 @@ message InferenceRequest {
repeated string feed_var_names = 2; repeated string feed_var_names = 2;
repeated string fetch_var_names = 3; repeated string fetch_var_names = 3;
required bool is_python = 4 [ default = false ]; required bool is_python = 4 [ default = false ];
required uint64 log_id = 5 [ default = 0 ];
}; };
message InferenceResponse { message InferenceResponse {
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import paddle_serving_server.pipeline as pipeline
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
...@@ -22,12 +21,12 @@ from paddle_serving_app.reader import IMDBDataset ...@@ -22,12 +21,12 @@ from paddle_serving_app.reader import IMDBDataset
import logging import logging
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
console_handler = pipeline.logger.StreamHandler() user_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) user_handler.setLevel(logging.INFO)
console_handler.setFormatter( user_handler.setFormatter(
logging.Formatter( logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")) "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(console_handler) _LOGGER.addHandler(user_handler)
class ImdbRequestOp(RequestOp): class ImdbRequestOp(RequestOp):
......
...@@ -233,7 +233,7 @@ class Client(object): ...@@ -233,7 +233,7 @@ class Client(object):
# key)) # key))
pass pass
def predict(self, feed=None, fetch=None, need_variant_tag=False): def predict(self, feed=None, fetch=None, need_variant_tag=False, log_id=0):
self.profile_.record('py_prepro_0') self.profile_.record('py_prepro_0')
if feed is None or fetch is None: if feed is None or fetch is None:
...@@ -319,12 +319,12 @@ class Client(object): ...@@ -319,12 +319,12 @@ class Client(object):
res = self.client_handle_.numpy_predict( res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle, int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid) self.pid, log_id)
elif self.has_numpy_input == False: elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict( res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle, int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid) self.pid, log_id)
else: else:
raise ValueError( raise ValueError(
"Please make sure the inputs are all in list type or all in numpy.array type" "Please make sure the inputs are all in list type or all in numpy.array type"
...@@ -466,10 +466,11 @@ class MultiLangClient(object): ...@@ -466,10 +466,11 @@ class MultiLangClient(object):
if var.is_lod_tensor: if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name) self.lod_tensor_set_.add(var.alias_name)
def _pack_inference_request(self, feed, fetch, is_python): def _pack_inference_request(self, feed, fetch, is_python, log_id):
req = multi_lang_general_model_service_pb2.InferenceRequest() req = multi_lang_general_model_service_pb2.InferenceRequest()
req.fetch_var_names.extend(fetch) req.fetch_var_names.extend(fetch)
req.is_python = is_python req.is_python = is_python
req.log_id = log_id
feed_batch = None feed_batch = None
if isinstance(feed, dict): if isinstance(feed, dict):
feed_batch = [feed] feed_batch = [feed]
...@@ -602,12 +603,13 @@ class MultiLangClient(object): ...@@ -602,12 +603,13 @@ class MultiLangClient(object):
fetch, fetch,
need_variant_tag=False, need_variant_tag=False,
asyn=False, asyn=False,
is_python=True): is_python=True,
log_id=0):
if not asyn: if not asyn:
try: try:
self.profile_.record('py_prepro_0') self.profile_.record('py_prepro_0')
req = self._pack_inference_request( req = self._pack_inference_request(
feed, fetch, is_python=is_python) feed, fetch, is_python=is_python, log_id=log_id)
self.profile_.record('py_prepro_1') self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0') self.profile_.record('py_client_infer_0')
...@@ -626,7 +628,8 @@ class MultiLangClient(object): ...@@ -626,7 +628,8 @@ class MultiLangClient(object):
except grpc.RpcError as e: except grpc.RpcError as e:
return {"serving_status_code": e.code()} return {"serving_status_code": e.code()}
else: else:
req = self._pack_inference_request(feed, fetch, is_python=is_python) req = self._pack_inference_request(
feed, fetch, is_python=is_python, log_id=log_id)
call_future = self.stub_.Inference.future( call_future = self.stub_.Inference.future(
req, timeout=self.rpc_timeout_s_) req, timeout=self.rpc_timeout_s_)
return MultiLangPredictFuture( return MultiLangPredictFuture(
......
...@@ -502,6 +502,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -502,6 +502,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
feed_names = list(request.feed_var_names) feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names) fetch_names = list(request.fetch_var_names)
is_python = request.is_python is_python = request.is_python
log_id = request.log_id
feed_batch = [] feed_batch = []
for feed_inst in request.insts: for feed_inst in request.insts:
feed_dict = {} feed_dict = {}
...@@ -530,7 +531,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -530,7 +531,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
data.shape = list(feed_inst.tensor_array[idx].shape) data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data feed_dict[name] = data
feed_batch.append(feed_dict) feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python): def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse() resp = multi_lang_general_model_service_pb2.InferenceResponse()
...@@ -583,10 +584,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -583,10 +584,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
return resp return resp
def Inference(self, request, context): def Inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_inference_request( feed_dict, fetch_names, is_python, log_id = \
request) self._unpack_inference_request(request)
ret = self.bclient_.predict( ret = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True) feed=feed_dict,
fetch=fetch_names,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python) return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context): def GetClientConfig(self, request, context):
......
...@@ -552,6 +552,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -552,6 +552,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
feed_names = list(request.feed_var_names) feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names) fetch_names = list(request.fetch_var_names)
is_python = request.is_python is_python = request.is_python
log_id = request.log_id
feed_batch = [] feed_batch = []
for feed_inst in request.insts: for feed_inst in request.insts:
feed_dict = {} feed_dict = {}
...@@ -580,7 +581,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -580,7 +581,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
data.shape = list(feed_inst.tensor_array[idx].shape) data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data feed_dict[name] = data
feed_batch.append(feed_dict) feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python): def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse() resp = multi_lang_general_model_service_pb2.InferenceResponse()
...@@ -633,10 +634,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. ...@@ -633,10 +634,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
return resp return resp
def Inference(self, request, context): def Inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_inference_request( feed_dict, fetch_names, is_python, log_id \
request) = self._unpack_inference_request(request)
ret = self.bclient_.predict( ret = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True) feed=feed_dict,
fetch=fetch_names,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python) return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context): def GetClientConfig(self, request, context):
......
...@@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp ...@@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
from analyse import Analyst from analyse import Analyst
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
...@@ -17,7 +17,7 @@ import copy ...@@ -17,7 +17,7 @@ import copy
import re import re
import logging import logging
_LOGGER = logging.getLogger("pipeline.analyse") _LOGGER = logging.getLogger(__name__)
class Analyst(object): class Analyst(object):
......
...@@ -29,7 +29,7 @@ import enum ...@@ -29,7 +29,7 @@ import enum
import os import os
import copy import copy
_LOGGER = logging.getLogger("pipeline.channel") _LOGGER = logging.getLogger(__name__)
class ChannelDataEcode(enum.Enum): class ChannelDataEcode(enum.Enum):
...@@ -181,6 +181,14 @@ class ChannelData(object): ...@@ -181,6 +181,14 @@ class ChannelData(object):
os._exit(-1) os._exit(-1)
return feed return feed
def __cmp__(self, other):
if self.id < other.id:
return -1
elif self.id == other.id:
return 0
else:
return 1
def __str__(self): def __str__(self):
return "type[{}], ecode[{}], id[{}]".format( return "type[{}], ecode[{}], id[{}]".format(
ChannelDataType(self.datatype).name, self.ecode, self.id) ChannelDataType(self.datatype).name, self.ecode, self.id)
...@@ -222,7 +230,7 @@ class ProcessChannel(object): ...@@ -222,7 +230,7 @@ class ProcessChannel(object):
# see more: # see more:
# - https://bugs.python.org/issue18277 # - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21 # - https://hg.python.org/cpython/rev/860fc6a2bd21
self._que = manager.Queue(maxsize=maxsize) self._que = manager.PriorityQueue(maxsize=maxsize)
self._maxsize = maxsize self._maxsize = maxsize
self.name = name self.name = name
self._stop = manager.Value('i', 0) self._stop = manager.Value('i', 0)
...@@ -489,7 +497,7 @@ class ProcessChannel(object): ...@@ -489,7 +497,7 @@ class ProcessChannel(object):
self._cv.notify_all() self._cv.notify_all()
class ThreadChannel(Queue.Queue): class ThreadChannel(Queue.PriorityQueue):
""" """
(Thread version)The channel used for communication between Ops. (Thread version)The channel used for communication between Ops.
......
...@@ -24,19 +24,20 @@ else: ...@@ -24,19 +24,20 @@ else:
raise Exception("Error Python version") raise Exception("Error Python version")
import os import os
import logging import logging
import collections
from .operator import Op, RequestOp, ResponseOp, VirtualOp from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData, from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler, PerformanceTracer from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger("pipeline.dag") _LOGGER = logging.getLogger(__name__)
class DAGExecutor(object): class DAGExecutor(object):
def __init__(self, response_op, server_conf): def __init__(self, response_op, server_conf, worker_idx):
build_dag_each_worker = server_conf["build_dag_each_worker"] build_dag_each_worker = server_conf["build_dag_each_worker"]
server_worker_num = server_conf["worker_num"] server_worker_num = server_conf["worker_num"]
dag_conf = server_conf["dag"] dag_conf = server_conf["dag"]
...@@ -74,9 +75,17 @@ class DAGExecutor(object): ...@@ -74,9 +75,17 @@ class DAGExecutor(object):
if self._tracer is not None: if self._tracer is not None:
self._tracer.start() self._tracer.start()
self._id_lock = threading.Lock() # generate id: data_id == request_id == log_id
self._id_counter = 0 base_counter = 0
self._reset_max_id = 1000000000000000000 gen_id_step = 1
if build_dag_each_worker:
base_counter = worker_idx
gen_id_step = server_worker_num
self._id_generator = ThreadIdGenerator(
max_id=1000000000000000000,
base_counter=base_counter,
step=gen_id_step)
self._cv_pool = {} self._cv_pool = {}
self._cv_for_cv_pool = threading.Condition() self._cv_for_cv_pool = threading.Condition()
self._fetch_buffer = {} self._fetch_buffer = {}
...@@ -98,13 +107,7 @@ class DAGExecutor(object): ...@@ -98,13 +107,7 @@ class DAGExecutor(object):
_LOGGER.info("[DAG Executor] Stop") _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self): def _get_next_data_id(self):
data_id = None data_id = self._id_generator.next()
with self._id_lock:
if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1
cond_v = threading.Condition() cond_v = threading.Condition()
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
self._cv_pool[data_id] = cond_v self._cv_pool[data_id] = cond_v
...@@ -282,12 +285,14 @@ class DAGExecutor(object): ...@@ -282,12 +285,14 @@ class DAGExecutor(object):
end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
if self._tracer is not None: if self._tracer is not None:
if resp_channeldata.ecode == ChannelDataEcode.OK.value: trace_buffer.put({
trace_buffer.put(("DAG", "call_{}".format(data_id), True, "name": "DAG",
end_call - start_call)) "id": data_id,
else: "succ": resp_channeldata.ecode == ChannelDataEcode.OK.value,
trace_buffer.put(("DAG", "call_{}".format(data_id), False, "actions": {
end_call - start_call)) "call_{}".format(data_id): end_call - start_call,
},
})
profile_str = self._profiler.gen_profile_str() profile_str = self._profiler.gen_profile_str()
if self._server_use_profile: if self._server_use_profile:
...@@ -330,7 +335,7 @@ class DAG(object): ...@@ -330,7 +335,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer self._tracer = tracer
if not self._is_thread_op: if not self._is_thread_op:
self._manager = multiprocessing.Manager() self._manager = PipelineProcSyncManager()
_LOGGER.info("[DAG] Succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): def get_use_ops(self, response_op):
...@@ -554,7 +559,8 @@ class DAG(object): ...@@ -554,7 +559,8 @@ class DAG(object):
self._pack_func = pack_func self._pack_func = pack_func
self._unpack_func = unpack_func self._unpack_func = unpack_func
self._tracer.set_channels(self._channels) if self._tracer is not None:
self._tracer.set_channels(self._channels)
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import logging.handlers import logging.config
import os import os
...@@ -25,66 +25,52 @@ class SectionLevelFilter(object): ...@@ -25,66 +25,52 @@ class SectionLevelFilter(object):
return logRecord.levelno in self._levels return logRecord.levelno in self._levels
class OutOfMouduleFilter(object):
def __init__(self, out_names):
self._out_names = out_names
def filter(self, logRecord):
return logRecord.name not in self._out_names
class OutOfMouduleAndSectionLevelFilter(object):
def __init__(self, out_names, levels):
self._out_names = out_names
self._levels = levels
def filter(self, logRecord):
if logRecord.name in self._out_names:
return False
return logRecord.levelno in self._levels
class StreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(StreamHandler, self).__init__(*args, **kwargs)
self.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
log_dir = "PipelineServingLogs" log_dir = "PipelineServingLogs"
if not os.path.exists(log_dir): if not os.path.exists(log_dir):
os.makedirs(log_dir) os.makedirs(log_dir)
# root logger logger_config = {
_LOGGER = logging.getLogger() "version": 1,
_LOGGER.setLevel(logging.DEBUG) "formatters": {
"normal_fmt": {
formatter = logging.Formatter( "format":
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s") "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s",
# info and warn },
file_info = logging.handlers.RotatingFileHandler( "tracer_fmt": {
os.path.join(log_dir, "INFO.log")) "format": "%(asctime)s %(message)s",
file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) },
file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING])) },
file_info.setFormatter(formatter) "handlers": {
"f_pipeline.log": {
# err and critical "class": "logging.FileHandler",
file_err = logging.handlers.RotatingFileHandler( "level": "INFO",
os.path.join(log_dir, "ERROR.log")) "formatter": "normal_fmt",
file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) "filename": os.path.join(log_dir, "pipeline.log"),
file_err.setLevel(logging.ERROR) },
file_err.setFormatter(formatter) "f_pipeline.log.wf": {
"class": "logging.FileHandler",
_LOGGER.addHandler(file_info) "level": "WARNING",
_LOGGER.addHandler(file_err) "formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
# tracer logger },
_TRACER = logging.getLogger("pipeline.profiler") "f_tracer.log": {
_TRACER.setLevel(logging.INFO) "class": "logging.FileHandler",
_TRACER.addFilter(logging.Filter("pipeline.profiler")) "level": "INFO",
"formatter": "tracer_fmt",
# tracer "filename": os.path.join(log_dir, "pipeline.tracer"),
tracer_formatter = logging.Formatter("%(asctime)s %(message)s") },
file_trace = logging.handlers.RotatingFileHandler( },
os.path.join(log_dir, "TRACE.log")) "loggers": {
file_trace.setFormatter(tracer_formatter) # propagate = True
_TRACER.addHandler(file_trace) ".".join(__name__.split(".")[:-1] + ["profiler"]): {
"level": "INFO",
"handlers": ["f_tracer.log"],
},
},
"root": {
"level": "DEBUG",
"handlers": ["f_pipeline.log", "f_pipeline.log.wf"],
},
}
logging.config.dictConfig(logger_config)
...@@ -22,8 +22,15 @@ import logging ...@@ -22,8 +22,15 @@ import logging
import func_timeout import func_timeout
import os import os
import sys import sys
import collections
import numpy as np import numpy as np
from numpy import * from numpy import *
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
...@@ -32,7 +39,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -32,7 +39,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
_LOGGER = logging.getLogger("pipeline.operator") _LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -127,7 +134,7 @@ class Op(object): ...@@ -127,7 +134,7 @@ class Op(object):
fetch_names): fetch_names):
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) has no client (and it also do not " _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function".format(self.name)) "run the process function)".format(self.name))
return None return None
if client_type == 'brpc': if client_type == 'brpc':
client = Client() client = Client()
...@@ -199,7 +206,7 @@ class Op(object): ...@@ -199,7 +206,7 @@ class Op(object):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_batch): def process(self, feed_batch, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_batch) err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0: if err != 0:
_LOGGER.critical( _LOGGER.critical(
...@@ -207,7 +214,7 @@ class Op(object): ...@@ -207,7 +214,7 @@ class Op(object):
"preprocess func.".format(err_info))) "preprocess func.".format(err_info)))
os._exit(-1) os._exit(-1)
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_batch, fetch=self._fetch_names) feed=feed_batch, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient): if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0: if call_result is None or call_result["serving_status_code"] != 0:
return None return None
...@@ -294,8 +301,8 @@ class Op(object): ...@@ -294,8 +301,8 @@ class Op(object):
def _run_preprocess(self, parsed_data_dict, op_info_prefix): def _run_preprocess(self, parsed_data_dict, op_info_prefix):
_LOGGER.debug("{} Running preprocess".format(op_info_prefix)) _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {} preped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
for data_id, parsed_data in parsed_data_dict.items(): for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None preped_data, error_channeldata = None, None
try: try:
...@@ -326,68 +333,132 @@ class Op(object): ...@@ -326,68 +333,132 @@ class Op(object):
def _run_process(self, preped_data_dict, op_info_prefix): def _run_process(self, preped_data_dict, op_info_prefix):
_LOGGER.debug("{} Running process".format(op_info_prefix)) _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {} midped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
if self.with_serving: if self.with_serving:
data_ids = preped_data_dict.keys() data_ids = preped_data_dict.keys()
feed_batch = [preped_data_dict[data_id] for data_id in data_ids] typical_logid = data_ids[0]
if len(data_ids) != 1:
for data_id in data_ids:
_LOGGER.info(
"(logid={}) {} During access to PaddleServingService,"
" we selected logid={} (from batch: {}) as a "
"representative for logging.".format(
data_id, op_info_prefix, typical_logid, data_ids))
# combine samples to batch
one_input = preped_data_dict[data_ids[0]]
feed_batch = []
input_offset = None
if isinstance(one_input, dict):
# sample input
feed_batch = [preped_data_dict[data_id] for data_id in data_ids]
input_offset = list(range(len(data_ids) + 1))
elif isinstance(one_input, list):
# batch input
input_offset = [0]
for data_id in data_ids:
batch_input = preped_data_dict[data_id]
offset = input_offset[-1] + len(batch_input)
feed_batch += batch_input
input_offset.append(offset)
else:
_LOGGER.critical(
"{} Failed to process: expect input type is dict(sample"
" input) or list(batch input), but get {}".format(
op_info_prefix, type(one_input)))
os._exit(-1)
midped_batch = None midped_batch = None
ecode = ChannelDataEcode.OK.value ecode = ChannelDataEcode.OK.value
if self._timeout <= 0: if self._timeout <= 0:
try: try:
midped_batch = self.process(feed_batch) midped_batch = self.process(feed_batch, typical_logid)
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = "{} Failed to process(batch: {}): {}".format( error_info = "(logid={}) {} Failed to process(batch: {}): {}".format(
op_info_prefix, data_ids, e) typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
else: else:
for i in range(self._retry): for i in range(self._retry):
try: try:
midped_batch = func_timeout.func_timeout( midped_batch = func_timeout.func_timeout(
self._timeout, self.process, args=(feed_batch, )) self._timeout,
self.process,
args=(feed_batch, typical_logid))
except func_timeout.FunctionTimedOut as e: except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry: if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value ecode = ChannelDataEcode.TIMEOUT.value
error_info = "{} Failed to process(batch: {}): " \ error_info = "(logid={}) {} Failed to process(batch: {}): " \
"exceeded retry count.".format( "exceeded retry count.".format(
op_info_prefix, data_ids) typical_logid, op_info_prefix, data_ids)
_LOGGER.error(error_info) _LOGGER.error(error_info)
else: else:
_LOGGER.warning( _LOGGER.warning(
"{} Failed to process(batch: {}): timeout, and retrying({}/{})" "(logid={}) {} Failed to process(batch: {}): timeout,"
.format(op_info_prefix, data_ids, i + 1, " and retrying({}/{})...".format(
self._retry)) typical_logid, op_info_prefix, data_ids, i +
1, self._retry))
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = "{} Failed to process(batch: {}): {}".format( error_info = "(logid={}) {} Failed to process(batch: {}): {}".format(
op_info_prefix, data_ids, e) typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
break break
else: else:
break break
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id) ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None: elif midped_batch is None:
# op client return None # op client return None
error_info = "{} Failed to predict, please check if PaddleServingService" \ error_info = "(logid={}) {} Failed to predict, please check if " \
" is working properly.".format(op_info_prefix) "PaddleServingService is working properly.".format(
typical_logid, op_info_prefix)
_LOGGER.error(error_info)
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value, ecode=ChannelDataEcode.CLIENT_ERROR.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id)
else: else:
# transform np format to dict format # transform np format to dict format
var_names = midped_batch.keys()
lod_var_names = set()
lod_offset_names = set()
for name in var_names:
lod_offset_name = "{}.lod".format(name)
if lod_offset_name in var_names:
_LOGGER.debug("(logid={}) {} {} is LodTensor".format(
typical_logid, op_info_prefix, name))
lod_var_names.add(name)
lod_offset_names.add(lod_offset_name)
for idx, data_id in enumerate(data_ids): for idx, data_id in enumerate(data_ids):
midped_data_dict[data_id] = { midped_data_dict[data_id] = {}
k: v[idx]
for k, v in midped_batch.items() for name, value in midped_batch.items():
} if name in lod_offset_names:
continue
if name in lod_var_names:
# lodtensor
lod_offset_name = "{}.lod".format(name)
lod_offset = midped_batch[lod_offset_name]
for idx, data_id in enumerate(data_ids):
data_offset_left = input_offset[idx]
data_offset_right = input_offset[idx + 1]
lod_offset_left = lod_offset[data_offset_left]
lod_offset_right = lod_offset[data_offset_right]
midped_data_dict[data_id][name] = value[lod_offset_left:lod_offset_right]
midped_data_dict[data_id][lod_offset_name] = \
lod_offset[data_offset_left:data_offset_right + 1] - lod_offset[data_offset_left]
else:
# normal tensor
for idx, data_id in enumerate(data_ids):
left = input_offset[idx]
right = input_offset[idx + 1]
midped_data_dict[data_id][name] = value[left:right]
else: else:
midped_data_dict = preped_data_dict midped_data_dict = preped_data_dict
_LOGGER.debug("{} Succ process".format(op_info_prefix)) _LOGGER.debug("{} Succ process".format(op_info_prefix))
...@@ -396,8 +467,8 @@ class Op(object): ...@@ -396,8 +467,8 @@ class Op(object):
def _run_postprocess(self, parsed_data_dict, midped_data_dict, def _run_postprocess(self, parsed_data_dict, midped_data_dict,
op_info_prefix): op_info_prefix):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix)) _LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {} postped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
for data_id, midped_data in midped_data_dict.items(): for data_id, midped_data in midped_data_dict.items():
postped_data, err_channeldata = None, None postped_data, err_channeldata = None, None
try: try:
...@@ -476,7 +547,7 @@ class Op(object): ...@@ -476,7 +547,7 @@ class Op(object):
yield batch yield batch
def _parse_channeldata_batch(self, batch, output_channels): def _parse_channeldata_batch(self, batch, output_channels):
parsed_data_dict = {} parsed_data_dict = collections.OrderedDict()
need_profile_dict = {} need_profile_dict = {}
profile_dict = {} profile_dict = {}
for channeldata_dict in batch: for channeldata_dict in batch:
...@@ -520,6 +591,7 @@ class Op(object): ...@@ -520,6 +591,7 @@ class Op(object):
op_info_prefix=op_info_prefix) op_info_prefix=op_info_prefix)
start, end = None, None start, end = None, None
trace_que = collections.deque()
while True: while True:
start = int(round(_time() * 1000000)) start = int(round(_time() * 1000000))
try: try:
...@@ -529,8 +601,7 @@ class Op(object): ...@@ -529,8 +601,7 @@ class Op(object):
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000)) end = int(round(_time() * 1000000))
if trace_buffer is not None: in_time = end - start
trace_buffer.put((self.name, "in", True, end - start))
# parse channeldata batch # parse channeldata batch
try: try:
...@@ -550,8 +621,7 @@ class Op(object): ...@@ -550,8 +621,7 @@ class Op(object):
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, op_info_prefix) = self._run_preprocess(parsed_data_dict, op_info_prefix)
end = profiler.record("prep#{}_1".format(op_info_prefix)) end = profiler.record("prep#{}_1".format(op_info_prefix))
if trace_buffer is not None: prep_time = end - start
trace_buffer.put((self.name, "prep", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -563,7 +633,7 @@ class Op(object): ...@@ -563,7 +633,7 @@ class Op(object):
_LOGGER.debug("{} Stop.".format(op_info_prefix)) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(parsed_data_dict) == 0: if len(preped_data_dict) == 0:
continue continue
# process # process
...@@ -571,8 +641,7 @@ class Op(object): ...@@ -571,8 +641,7 @@ class Op(object):
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix) = self._run_process(preped_data_dict, op_info_prefix)
end = profiler.record("midp#{}_1".format(op_info_prefix)) end = profiler.record("midp#{}_1".format(op_info_prefix))
if trace_buffer is not None: midp_time = end - start
trace_buffer.put((self.name, "midp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -593,12 +662,11 @@ class Op(object): ...@@ -593,12 +662,11 @@ class Op(object):
= self._run_postprocess( = self._run_postprocess(
parsed_data_dict, midped_data_dict, op_info_prefix) parsed_data_dict, midped_data_dict, op_info_prefix)
end = profiler.record("postp#{}_1".format(op_info_prefix)) end = profiler.record("postp#{}_1".format(op_info_prefix))
if trace_buffer is not None: postp_time = end - start
trace_buffer.put((self.name, "postp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
data=error_channeldata, data=err_channeldata,
channels=output_channels, channels=output_channels,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
...@@ -627,8 +695,25 @@ class Op(object): ...@@ -627,8 +695,25 @@ class Op(object):
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000)) end = int(round(_time() * 1000000))
out_time = end - start
if trace_buffer is not None: if trace_buffer is not None:
trace_buffer.put((self.name, "out", True, end - start)) trace_que.append({
"name": self.name,
"actions": {
"in": in_time,
"prep": prep_time,
"midp": midp_time,
"postp": postp_time,
"out": out_time,
}
})
while trace_que:
info = trace_que[0]
try:
trace_buffer.put_nowait(info)
trace_que.popleft()
except Queue.Full:
break
def _initialize(self, is_thread_op, client_type, concurrency_idx): def _initialize(self, is_thread_op, client_type, concurrency_idx):
if is_thread_op: if is_thread_op:
...@@ -718,7 +803,7 @@ class ResponseOp(Op): ...@@ -718,7 +803,7 @@ class ResponseOp(Op):
feed = channeldata.parse() feed = channeldata.parse()
# ndarray to string: # ndarray to string:
# https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray # https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray
np.set_printoptions(threshold=np.nan) np.set_printoptions(threshold=sys.maxsize)
for name, var in feed.items(): for name, var in feed.items():
resp.value.append(var.__repr__()) resp.value.append(var.__repr__())
resp.key.append(name) resp.key.append(name)
......
...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode ...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
_LOGGER = logging.getLogger("pipeline.pipeline_client") _LOGGER = logging.getLogger(__name__)
class PipelineClient(object): class PipelineClient(object):
......
...@@ -26,14 +26,14 @@ from .proto import pipeline_service_pb2_grpc ...@@ -26,14 +26,14 @@ from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp from .operator import ResponseOp
from .dag import DAGExecutor from .dag import DAGExecutor
_LOGGER = logging.getLogger("pipeline.pipeline_server") _LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_conf): def __init__(self, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
# init dag executor # init dag executor
self._dag_executor = DAGExecutor(response_op, dag_conf) self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
...@@ -92,8 +92,9 @@ class PipelineServer(object): ...@@ -92,8 +92,9 @@ class PipelineServer(object):
json.dumps( json.dumps(
conf, indent=4, separators=(',', ':')))) conf, indent=4, separators=(',', ':'))))
if self._build_dag_each_worker is True: if self._build_dag_each_worker is True:
_LOGGER.info( _LOGGER.warning(
"(Make sure that install grpcio whl with --no-binary flag)") "(Make sure that install grpcio whl with --no-binary flag: "
"pip install grpcio --no-binary grpcio)")
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
self._conf = conf self._conf = conf
...@@ -107,27 +108,31 @@ class PipelineServer(object): ...@@ -107,27 +108,31 @@ class PipelineServer(object):
show_info = (i == 0) show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, self._conf)) args=(bind_address, self._response_op, self._conf, i))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers: for worker in workers:
worker.join() worker.join()
else: else:
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._worker_num)) futures.ThreadPoolExecutor(max_workers=self._worker_num),
options=[('grpc.max_send_message_length', 256 * 1024 * 1024),
('grpc.max_receive_message_length', 256 * 1024 * 1024)])
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._conf), server) PipelineServicer(self._response_op, self._conf), server)
server.add_insecure_port('[::]:{}'.format(self._port)) server.add_insecure_port('[::]:{}'.format(self._port))
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
options = (('grpc.so_reuseport', 1), ) options = [('grpc.so_reuseport', 1),
('grpc.max_send_message_length', 256 * 1024 * 1024),
('grpc.max_send_message_length', 256 * 1024 * 1024)]
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_conf), server) PipelineServicer(response_op, dag_conf, worker_idx), server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
...@@ -177,7 +182,7 @@ class ServerYamlConfChecker(object): ...@@ -177,7 +182,7 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_tracer_conf(conf): def check_tracer_conf(conf):
default_conf = {"interval_s": 600, } default_conf = {"interval_s": -1, }
conf_type = {"interval_s": int, } conf_type = {"interval_s": int, }
......
...@@ -27,7 +27,8 @@ import time ...@@ -27,7 +27,8 @@ import time
import threading import threading
import multiprocessing import multiprocessing
_TRACER = logging.getLogger("pipeline.profiler") _LOGGER = logging.getLogger(__name__)
_LOGGER.propagate = False
class PerformanceTracer(object): class PerformanceTracer(object):
...@@ -67,26 +68,35 @@ class PerformanceTracer(object): ...@@ -67,26 +68,35 @@ class PerformanceTracer(object):
self._channels = channels self._channels = channels
def _trace_func(self, channels): def _trace_func(self, channels):
actions = ["in", "prep", "midp", "postp", "out"] all_actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"] calcu_actions = ["prep", "midp", "postp"]
while True: while True:
op_cost = {} op_cost = {}
err_request = []
err_count = 0 err_count = 0
_TRACER.info("==================== TRACER ======================") _LOGGER.info("==================== TRACER ======================")
# op # op
while True: while True:
try: try:
name, action, stage, cost = self._data_buffer.get_nowait() item = self._data_buffer.get_nowait()
if stage == False: name = item["name"]
# only for name == DAG actions = item["actions"]
assert name == "DAG"
err_count += 1 if name == "DAG":
succ = item["succ"]
req_id = item["id"]
if not succ:
err_count += 1
err_request.append(req_id)
if name not in op_cost: if name not in op_cost:
op_cost[name] = {} op_cost[name] = {}
if action not in op_cost[name]:
op_cost[name][action] = [] for action, cost in actions.items():
op_cost[name][action].append(cost) if action not in op_cost[name]:
op_cost[name][action] = []
op_cost[name][action].append(cost)
except Queue.Empty: except Queue.Empty:
break break
...@@ -98,15 +108,15 @@ class PerformanceTracer(object): ...@@ -98,15 +108,15 @@ class PerformanceTracer(object):
tot_cost += op_cost[name][action] tot_cost += op_cost[name][action]
if name != "DAG": if name != "DAG":
_TRACER.info("Op({}):".format(name)) _LOGGER.info("Op({}):".format(name))
for action in actions: for action in all_actions:
if action in op_cost[name]: if action in op_cost[name]:
_TRACER.info("\t{}[{} ms]".format( _LOGGER.info("\t{}[{} ms]".format(
action, op_cost[name][action])) action, op_cost[name][action]))
for action in calcu_actions: for action in calcu_actions:
if action in op_cost[name]: if action in op_cost[name]:
calcu_cost += op_cost[name][action] calcu_cost += op_cost[name][action]
_TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / _LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost)) tot_cost))
if "DAG" in op_cost: if "DAG" in op_cost:
...@@ -116,21 +126,22 @@ class PerformanceTracer(object): ...@@ -116,21 +126,22 @@ class PerformanceTracer(object):
qps = 1.0 * tot / self._interval_s qps = 1.0 * tot / self._interval_s
ave_cost = sum(calls) / tot ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99] latencys = [50, 60, 70, 80, 90, 95, 99]
_TRACER.info("DAGExecutor:") _LOGGER.info("DAGExecutor:")
_TRACER.info("\tquery count[{}]".format(tot)) _LOGGER.info("\tQuery count[{}]".format(tot))
_TRACER.info("\tqps[{} q/s]".format(qps)) _LOGGER.info("\tQPS[{} q/s]".format(qps))
_TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) _LOGGER.info("\tSucc[{}]".format(1 - 1.0 * err_count / tot))
_TRACER.info("\tlatency:") _LOGGER.info("\tError req[{}]".format(", ".join([str(x) for x in err_request)]))
_TRACER.info("\t\tave[{} ms]".format(ave_cost)) _LOGGER.info("\tLatency:")
_LOGGER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys: for latency in latencys:
_TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int( _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)])) tot * latency / 100.0)]))
# channel # channel
_TRACER.info("Channel (server worker num[{}]):".format( _LOGGER.info("Channel (server worker num[{}]):".format(
self._server_worker_num)) self._server_worker_num))
for channel in channels: for channel in channels:
_TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format( _LOGGER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
channel.name, channel.name,
channel.get_producers(), channel.get_producers(),
channel.get_consumers(), channel.get_consumers(),
......
...@@ -13,13 +13,101 @@ ...@@ -13,13 +13,101 @@
# limitations under the License. # limitations under the License.
import sys import sys
import logging
import threading
import multiprocessing
import multiprocessing.managers
if sys.version_info.major == 2:
import Queue
from Queue import PriorityQueue
elif sys.version_info.major == 3:
import queue as Queue
from queue import PriorityQueue
else:
raise Exception("Error Python version")
_LOGGER = logging.getLogger(__name__)
class NameGenerator(object): class NameGenerator(object):
# use unsafe-id-generator
def __init__(self, prefix): def __init__(self, prefix):
self._idx = -1 self._idx = -1
self._prefix = prefix self._prefix = prefix
self._id_generator = UnsafeIdGenerator(1000000000000000000)
def next(self):
next_id = self._id_generator.next()
return "{}{}".format(self._prefix, next_id)
class UnsafeIdGenerator(object):
def __init__(self, max_id, base_counter=0, step=1):
self._base_counter = base_counter
self._counter = self._base_counter
self._step = step
self._max_id = max_id # for reset
def next(self):
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ThreadIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = threading.Lock()
super(ThreadIdGenerator, self).__init__(max_id, base_counter, step)
def next(self): def next(self):
self._idx += 1 next_id = None
return "{}{}".format(self._prefix, self._idx) with self._lock:
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ProcessIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = multiprocessing.Lock()
self._base_counter = base_counter
self._counter = multiprocessing.Manager().Value('i', 0)
self._step = step
self._max_id = max_id
def next(self):
next_id = None
with self._lock:
if self._counter.value >= self._max_id:
self._counter.value = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter.value))
next_id = self._counter.value
self._counter.value += self._step
return next_id
def PipelineProcSyncManager():
"""
add PriorityQueue into SyncManager, see more:
https://stackoverflow.com/questions/25324560/strange-queue-priorityqueue-behaviour-with-multiprocessing-in-python-2-7-6?answertab=active#tab-top
"""
class PipelineManager(multiprocessing.managers.SyncManager):
pass
PipelineManager.register("PriorityQueue", PriorityQueue)
m = PipelineManager()
m.start()
return m
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册