提交 05e74bdd 编写于 作者: B barrierye

Merge branch 'pipeline-auto-batch' of https://github.com/barrierye/Serving into pipeline-auto-batch

......@@ -37,6 +37,7 @@ message InferenceRequest {
repeated string feed_var_names = 2;
repeated string fetch_var_names = 3;
required bool is_python = 4 [ default = false ];
required uint64 log_id = 5 [ default = 0 ];
};
message InferenceResponse {
......
......@@ -227,7 +227,8 @@ class PredictorClient {
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid);
const int& pid,
const uint64_t log_id);
int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
......@@ -238,7 +239,8 @@ class PredictorClient {
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid);
const int& pid,
const uint64_t log_id);
private:
PredictorApi _api;
......
......@@ -144,7 +144,8 @@ int PredictorClient::batch_predict(
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear();
......@@ -162,6 +163,7 @@ int PredictorClient::batch_predict(
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
......@@ -356,7 +358,8 @@ int PredictorClient::numpy_predict(
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
VLOG(2) << "batch size: " << batch_size;
predict_res_batch.clear();
......@@ -374,6 +377,7 @@ int PredictorClient::numpy_predict(
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
......
......@@ -107,7 +107,8 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
const int &pid,
const uint64_t log_id) {
return self.batch_predict(float_feed_batch,
float_feed_name,
float_shape,
......@@ -116,7 +117,8 @@ PYBIND11_MODULE(serving_client, m) {
int_shape,
fetch_name,
predict_res_batch,
pid);
pid,
log_id);
},
py::call_guard<py::gil_scoped_release>())
.def("numpy_predict",
......@@ -131,7 +133,8 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
const int &pid,
const uint64_t log_id) {
return self.numpy_predict(float_feed_batch,
float_feed_name,
float_shape,
......@@ -140,7 +143,8 @@ PYBIND11_MODULE(serving_client, m) {
int_shape,
fetch_name,
predict_res_batch,
pid);
pid,
log_id);
},
py::call_guard<py::gil_scoped_release>());
}
......
......@@ -35,6 +35,7 @@ struct GeneralBlob {
std::vector<paddle::PaddleTensor> tensor_vector;
int64_t time_stamp[20];
int p_size = 0;
uint64_t _log_id = -1; // for logging
int _batch_size;
......@@ -46,9 +47,11 @@ struct GeneralBlob {
tensor_vector.clear();
}
int SetBatchSize(int batch_size) { _batch_size = batch_size; }
void SetBatchSize(int batch_size) { _batch_size = batch_size; }
void SetLogId(uint64_t log_id) { _log_id = log_id; }
int GetBatchSize() const { return _batch_size; }
uint64_t GetLogId() const { return _log_id; }
std::string ShortDebugString() const { return "Not implemented!"; }
};
......
......@@ -47,22 +47,25 @@ int GeneralInferOp::inference() {
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
output_blob->SetLogId(log_id);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op:" << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
TensorVector *out = &output_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
output_blob->SetBatchSize(batch_size);
VLOG(2) << "infer batch size: " << batch_size;
VLOG(2) << "(logid=" << log_id << ") infer batch size: " << batch_size;
Timer timeline;
int64_t start = timeline.TimeStampUS();
......@@ -70,7 +73,8 @@ int GeneralInferOp::inference() {
if (InferManager::instance().infer(
engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str();
LOG(ERROR) << "(logid=" << log_id
<< ") Failed do infer in fluid model: " << engine_name().c_str();
return -1;
}
......
......@@ -72,6 +72,7 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() {
// reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id();
int batch_size = req->insts_size();
int input_var_num = 0;
......@@ -83,25 +84,28 @@ int GeneralReaderOp::inference() {
TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size);
res->SetLogId(log_id);
if (!res) {
LOG(ERROR) << "Failed get op tls reader object output";
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op tls reader object output";
}
Timer timeline;
int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "var num: " << var_num;
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num;
VLOG(2) << "start to call load general model_conf op";
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "get resource pointer done.";
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
VLOG(2) << "print general model config done.";
VLOG(2) << "(logid=" << log_id << ") print general model config done.";
// TODO(guru4elephant): how to do conditional check?
/*
......@@ -122,7 +126,8 @@ int GeneralReaderOp::inference() {
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64;
......@@ -137,17 +142,19 @@ int GeneralReaderOp::inference() {
if (model_config->_is_lod_feed[i]) {
lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0);
VLOG(2) << "var[" << i << "] is lod_tensor";
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
} else {
lod_tensor.shape.push_back(batch_size);
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "shape for var[" << i << "]: " << dim;
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "var[" << i << "] is tensor, capacity: " << capacity[i];
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
}
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
......@@ -167,11 +174,12 @@ int GeneralReaderOp::inference() {
} else if (tensor.int_data_size() > 0) {
data_len = tensor.int_data_size();
}
VLOG(2) << "tensor size for var[" << i << "]: " << data_len;
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
tensor_size += data_len;
int cur_len = out->at(i).lod[0].back();
VLOG(2) << "current len: " << cur_len;
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0;
if (tensor.shape_size() == 1) {
......@@ -180,7 +188,7 @@ int GeneralReaderOp::inference() {
sample_len = tensor.shape(0);
}
out->at(i).lod[0].push_back(cur_len + sample_len);
VLOG(2) << "new len: " << cur_len + sample_len;
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
}
out->at(i).data.Resize(tensor_size * elem_size[i]);
out->at(i).shape = {out->at(i).lod[0].back()};
......@@ -190,11 +198,11 @@ int GeneralReaderOp::inference() {
if (out->at(i).shape.size() == 1) {
out->at(i).shape.push_back(1);
}
VLOG(2) << "var[" << i
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else {
out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
VLOG(2) << "var[" << i
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor and capacity=" << batch_size * capacity[i];
}
}
......@@ -203,8 +211,8 @@ int GeneralReaderOp::inference() {
for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == 0) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is "
<< req->insts(0).tensor_array(i).int64_data(0);
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int64_data(0);
int offset = 0;
for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int64_data_size();
......@@ -219,8 +227,8 @@ int GeneralReaderOp::inference() {
}
} else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is "
<< req->insts(0).tensor_array(i).float_data(0);
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).float_data(0);
int offset = 0;
for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).float_data_size();
......@@ -235,8 +243,8 @@ int GeneralReaderOp::inference() {
}
} else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "first element data in var[" << i << "] is "
<< req->insts(0).tensor_array(i).int_data(0);
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int_data(0);
int offset = 0;
for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int_data_size();
......@@ -252,7 +260,7 @@ int GeneralReaderOp::inference() {
}
}
VLOG(2) << "output size: " << out->size();
VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
timeline.Pause();
int64_t end = timeline.TimeStampUS();
......@@ -260,7 +268,7 @@ int GeneralReaderOp::inference() {
AddBlobInfo(res, start);
AddBlobInfo(res, end);
VLOG(2) << "read data from client success";
VLOG(2) << "(logid=" << log_id << ") read data from client success";
return 0;
}
DEFINE_OP(GeneralReaderOp);
......
......@@ -75,10 +75,12 @@ int GeneralResponseOp::inference() {
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
uint64_t curr_logid = input_blob->GetLogId();
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
LOG(ERROR) << "(logid=" << curr_logid
<< ") Failed mutable depended argument, op: " << pre_name;
return -1;
}
......@@ -92,17 +94,19 @@ int GeneralResponseOp::inference() {
for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array();
if (model_config->_is_lod_fetch[idx]) {
VLOG(2) << "out[" << idx << "] " << model_config->_fetch_name[idx]
<< " is lod_tensor";
VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is lod_tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k];
VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
} else {
VLOG(2) << "out[" << idx << "] " << model_config->_fetch_name[idx]
<< " is tensor";
VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k];
VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
}
......@@ -119,8 +123,8 @@ int GeneralResponseOp::inference() {
auto dtype = in->at(idx).dtype;
if (dtype == paddle::PaddleDType::INT64) {
VLOG(2) << "Prepare int64 var [" << model_config->_fetch_name[idx]
<< "].";
VLOG(2) << "(logid=" << curr_logid << ") Prepare int64 var ["
<< model_config->_fetch_name[idx] << "].";
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
// from
// https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy
......@@ -130,16 +134,16 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::FLOAT32) {
VLOG(2) << "Prepare float var [" << model_config->_fetch_name[idx]
<< "].";
VLOG(2) << "(logid=" << curr_logid << ") Prepare float var ["
<< model_config->_fetch_name[idx] << "].";
float *data_ptr = static_cast<float *>(in->at(idx).data.data());
google::protobuf::RepeatedField<float> tmp_data(data_ptr,
data_ptr + cap);
fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::INT32) {
VLOG(2) << "Prepare int32 var [" << model_config->_fetch_name[idx]
<< "].";
VLOG(2) << "(logid=" << curr_logid << ")Prepare int32 var ["
<< model_config->_fetch_name[idx] << "].";
int32_t *data_ptr = static_cast<int32_t *>(in->at(idx).data.data());
google::protobuf::RepeatedField<int32_t> tmp_data(data_ptr,
data_ptr + cap);
......@@ -154,7 +158,8 @@ int GeneralResponseOp::inference() {
}
}
VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready";
VLOG(2) << "(logid=" << curr_logid << ") fetch var ["
<< model_config->_fetch_name[idx] << "] ready";
var_idx++;
}
}
......@@ -167,7 +172,9 @@ int GeneralResponseOp::inference() {
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size;
uint64_t curr_logid = input_blob->GetLogId();
VLOG(2) << "(logid=" << curr_logid
<< ") p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
if (pi == 0) {
profile_time_idx = 0;
......
......@@ -37,6 +37,7 @@ message Request {
repeated FeedInst insts = 1;
repeated string fetch_var_names = 2;
optional bool profile_server = 3 [ default = false ];
required uint64 log_id = 4 [ default = 0 ];
};
message Response {
......
......@@ -21,6 +21,7 @@ option cc_generic_services = true;
message RequestAndResponse {
required int32 a = 1;
required float b = 2;
required uint64 log_id = 3 [ default = 0 ];
};
service LoadGeneralModelService {
......
......@@ -280,6 +280,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" "
"::baidu::paddle_serving::predictor::InferServiceManager::instance("
......@@ -317,6 +318,7 @@ class PdsCodeGenerator : public CodeGenerator {
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" "
"::baidu::paddle_serving::predictor::InferServiceManager::instance("
......@@ -1011,6 +1013,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" "
"::baidu::paddle_serving::predictor::InferServiceManager::instance("
......@@ -1048,6 +1051,7 @@ class PdsCodeGenerator : public CodeGenerator {
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n"
" cntl->set_log_id(request->log_id());\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" "
"::baidu::paddle_serving::predictor::InferServiceManager::instance("
......
......@@ -37,6 +37,7 @@ message Request {
repeated FeedInst insts = 1;
repeated string fetch_var_names = 2;
optional bool profile_server = 3 [ default = false ];
required uint64 log_id = 4 [ default = 0 ];
};
message Response {
......
......@@ -192,14 +192,16 @@ public class Client {
private InferenceRequest _packInferenceRequest(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) throws IllegalArgumentException {
Iterable<String> fetch,
long log_id) throws IllegalArgumentException {
List<String> feed_var_names = new ArrayList<String>();
feed_var_names.addAll(feed_batch.get(0).keySet());
InferenceRequest.Builder req_builder = InferenceRequest.newBuilder()
.addAllFeedVarNames(feed_var_names)
.addAllFetchVarNames(fetch)
.setIsPython(false);
.setIsPython(false)
.setLogId(log_id);
for (HashMap<String, INDArray> feed_data: feed_batch) {
FeedInst.Builder inst_builder = FeedInst.newBuilder();
for (String name: feed_var_names) {
......@@ -332,76 +334,151 @@ public class Client {
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch) {
return predict(feed, fetch, false);
return predict(feed, fetch, false, 0);
}
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return predict(feed, fetch, false, log_id);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch) {
return ensemble_predict(feed, fetch, false);
return ensemble_predict(feed, fetch, false, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return ensemble_predict(feed, fetch, false, log_id);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch) {
return asyn_predict(feed, fetch, false);
return asyn_predict(feed, fetch, false, 0);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
long log_id) {
return asyn_predict(feed, fetch, false, log_id);
}
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag) {
return predict(feed, fetch, need_variant_tag, 0);
}
public Map<String, INDArray> predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed);
return predict(feed_batch, fetch, need_variant_tag);
return predict(feed_batch, fetch, need_variant_tag, log_id);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag) {
return ensemble_predict(feed, fetch, need_variant_tag, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed);
return ensemble_predict(feed_batch, fetch, need_variant_tag);
return ensemble_predict(feed_batch, fetch, need_variant_tag, log_id);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag) {
return asyn_predict(feed, fetch, need_variant_tag, 0);
}
public PredictFuture asyn_predict(
HashMap<String, INDArray> feed,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
List<HashMap<String, INDArray>> feed_batch
= new ArrayList<HashMap<String, INDArray>>();
feed_batch.add(feed);
return asyn_predict(feed_batch, fetch, need_variant_tag);
return asyn_predict(feed_batch, fetch, need_variant_tag, log_id);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) {
return predict(feed_batch, fetch, false);
return predict(feed_batch, fetch, false, 0);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return predict(feed_batch, fetch, false, log_id);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) {
return ensemble_predict(feed_batch, fetch, false);
return ensemble_predict(feed_batch, fetch, false, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return ensemble_predict(feed_batch, fetch, false, log_id);
}
public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch) {
return asyn_predict(feed_batch, fetch, false);
return asyn_predict(feed_batch, fetch, false, 0);
}
public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
long log_id) {
return asyn_predict(feed_batch, fetch, false, log_id);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag) {
return predict(feed_batch, fetch, need_variant_tag, 0);
}
public Map<String, INDArray> predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
try {
profiler_.record("java_prepro_0");
InferenceRequest req = _packInferenceRequest(feed_batch, fetch);
InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
profiler_.record("java_prepro_1");
profiler_.record("java_client_infer_0");
......@@ -415,7 +492,7 @@ public class Client {
= new ArrayList<Map.Entry<String, HashMap<String, INDArray>>>(
ensemble_result.entrySet());
if (list.size() != 1) {
System.out.format("predict failed: please use ensemble_predict impl.\n");
System.out.format("Failed to predict: please use ensemble_predict impl.\n");
return null;
}
profiler_.record("java_postpro_1");
......@@ -423,7 +500,7 @@ public class Client {
return list.get(0).getValue();
} catch (StatusRuntimeException e) {
System.out.format("predict failed: %s\n", e.toString());
System.out.format("Failed to predict: %s\n", e.toString());
return null;
}
}
......@@ -432,9 +509,18 @@ public class Client {
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag) {
return ensemble_predict(feed_batch, fetch, need_variant_tag, 0);
}
public Map<String, HashMap<String, INDArray>> ensemble_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
try {
profiler_.record("java_prepro_0");
InferenceRequest req = _packInferenceRequest(feed_batch, fetch);
InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
profiler_.record("java_prepro_1");
profiler_.record("java_client_infer_0");
......@@ -449,7 +535,7 @@ public class Client {
return ensemble_result;
} catch (StatusRuntimeException e) {
System.out.format("predict failed: %s\n", e.toString());
System.out.format("Failed to predict: %s\n", e.toString());
return null;
}
}
......@@ -458,7 +544,16 @@ public class Client {
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag) {
InferenceRequest req = _packInferenceRequest(feed_batch, fetch);
return asyn_predict(feed_batch, fetch, need_variant_tag, 0);
}
public PredictFuture asyn_predict(
List<HashMap<String, INDArray>> feed_batch,
Iterable<String> fetch,
Boolean need_variant_tag,
long log_id) {
InferenceRequest req = _packInferenceRequest(
feed_batch, fetch, log_id);
ListenableFuture<InferenceResponse> future = futureStub_.inference(req);
PredictFuture predict_future = new PredictFuture(future,
(InferenceResponse resp) -> {
......
......@@ -37,6 +37,7 @@ message InferenceRequest {
repeated string feed_var_names = 2;
repeated string fetch_var_names = 3;
required bool is_python = 4 [ default = false ];
required uint64 log_id = 5 [ default = 0 ];
};
message InferenceResponse {
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import paddle_serving_server.pipeline as pipeline
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
......@@ -22,12 +21,12 @@ from paddle_serving_app.reader import IMDBDataset
import logging
_LOGGER = logging.getLogger()
console_handler = pipeline.logger.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
user_handler = logging.StreamHandler()
user_handler.setLevel(logging.INFO)
user_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(console_handler)
_LOGGER.addHandler(user_handler)
class ImdbRequestOp(RequestOp):
......
......@@ -233,7 +233,7 @@ class Client(object):
# key))
pass
def predict(self, feed=None, fetch=None, need_variant_tag=False):
def predict(self, feed=None, fetch=None, need_variant_tag=False, log_id=0):
self.profile_.record('py_prepro_0')
if feed is None or fetch is None:
......@@ -319,12 +319,12 @@ class Client(object):
res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid)
self.pid, log_id)
elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid)
self.pid, log_id)
else:
raise ValueError(
"Please make sure the inputs are all in list type or all in numpy.array type"
......@@ -466,10 +466,11 @@ class MultiLangClient(object):
if var.is_lod_tensor:
self.lod_tensor_set_.add(var.alias_name)
def _pack_inference_request(self, feed, fetch, is_python):
def _pack_inference_request(self, feed, fetch, is_python, log_id):
req = multi_lang_general_model_service_pb2.InferenceRequest()
req.fetch_var_names.extend(fetch)
req.is_python = is_python
req.log_id = log_id
feed_batch = None
if isinstance(feed, dict):
feed_batch = [feed]
......@@ -602,12 +603,13 @@ class MultiLangClient(object):
fetch,
need_variant_tag=False,
asyn=False,
is_python=True):
is_python=True,
log_id=0):
if not asyn:
try:
self.profile_.record('py_prepro_0')
req = self._pack_inference_request(
feed, fetch, is_python=is_python)
feed, fetch, is_python=is_python, log_id=log_id)
self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0')
......@@ -626,7 +628,8 @@ class MultiLangClient(object):
except grpc.RpcError as e:
return {"serving_status_code": e.code()}
else:
req = self._pack_inference_request(feed, fetch, is_python=is_python)
req = self._pack_inference_request(
feed, fetch, is_python=is_python, log_id=log_id)
call_future = self.stub_.Inference.future(
req, timeout=self.rpc_timeout_s_)
return MultiLangPredictFuture(
......
......@@ -502,6 +502,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
......@@ -530,7 +531,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
......@@ -583,10 +584,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
return resp
def Inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_inference_request(
request)
feed_dict, fetch_names, is_python, log_id = \
self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True)
feed=feed_dict,
fetch=fetch_names,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
......
......@@ -552,6 +552,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
feed_names = list(request.feed_var_names)
fetch_names = list(request.fetch_var_names)
is_python = request.is_python
log_id = request.log_id
feed_batch = []
for feed_inst in request.insts:
feed_dict = {}
......@@ -580,7 +581,7 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
data.shape = list(feed_inst.tensor_array[idx].shape)
feed_dict[name] = data
feed_batch.append(feed_dict)
return feed_batch, fetch_names, is_python
return feed_batch, fetch_names, is_python, log_id
def _pack_inference_response(self, ret, fetch_names, is_python):
resp = multi_lang_general_model_service_pb2.InferenceResponse()
......@@ -633,10 +634,13 @@ class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc.
return resp
def Inference(self, request, context):
feed_dict, fetch_names, is_python = self._unpack_inference_request(
request)
feed_dict, fetch_names, is_python, log_id \
= self._unpack_inference_request(request)
ret = self.bclient_.predict(
feed=feed_dict, fetch=fetch_names, need_variant_tag=True)
feed=feed_dict,
fetch=fetch_names,
need_variant_tag=True,
log_id=log_id)
return self._pack_inference_response(ret, fetch_names, is_python)
def GetClientConfig(self, request, context):
......
......@@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
......@@ -17,7 +17,7 @@ import copy
import re
import logging
_LOGGER = logging.getLogger("pipeline.analyse")
_LOGGER = logging.getLogger(__name__)
class Analyst(object):
......
......@@ -29,7 +29,7 @@ import enum
import os
import copy
_LOGGER = logging.getLogger("pipeline.channel")
_LOGGER = logging.getLogger(__name__)
class ChannelDataEcode(enum.Enum):
......@@ -181,6 +181,14 @@ class ChannelData(object):
os._exit(-1)
return feed
def __cmp__(self, other):
if self.id < other.id:
return -1
elif self.id == other.id:
return 0
else:
return 1
def __str__(self):
return "type[{}], ecode[{}], id[{}]".format(
ChannelDataType(self.datatype).name, self.ecode, self.id)
......@@ -222,7 +230,7 @@ class ProcessChannel(object):
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
self._que = manager.Queue(maxsize=maxsize)
self._que = manager.PriorityQueue(maxsize=maxsize)
self._maxsize = maxsize
self.name = name
self._stop = manager.Value('i', 0)
......@@ -489,7 +497,7 @@ class ProcessChannel(object):
self._cv.notify_all()
class ThreadChannel(Queue.Queue):
class ThreadChannel(Queue.PriorityQueue):
"""
(Thread version)The channel used for communication between Ops.
......
......@@ -24,19 +24,20 @@ else:
raise Exception("Error Python version")
import os
import logging
import collections
from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator
from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger("pipeline.dag")
_LOGGER = logging.getLogger(__name__)
class DAGExecutor(object):
def __init__(self, response_op, server_conf):
def __init__(self, response_op, server_conf, worker_idx):
build_dag_each_worker = server_conf["build_dag_each_worker"]
server_worker_num = server_conf["worker_num"]
dag_conf = server_conf["dag"]
......@@ -74,9 +75,17 @@ class DAGExecutor(object):
if self._tracer is not None:
self._tracer.start()
self._id_lock = threading.Lock()
self._id_counter = 0
self._reset_max_id = 1000000000000000000
# generate id: data_id == request_id == log_id
base_counter = 0
gen_id_step = 1
if build_dag_each_worker:
base_counter = worker_idx
gen_id_step = server_worker_num
self._id_generator = ThreadIdGenerator(
max_id=1000000000000000000,
base_counter=base_counter,
step=gen_id_step)
self._cv_pool = {}
self._cv_for_cv_pool = threading.Condition()
self._fetch_buffer = {}
......@@ -98,13 +107,7 @@ class DAGExecutor(object):
_LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self):
data_id = None
with self._id_lock:
if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1
data_id = self._id_generator.next()
cond_v = threading.Condition()
with self._cv_for_cv_pool:
self._cv_pool[data_id] = cond_v
......@@ -282,12 +285,14 @@ class DAGExecutor(object):
end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
if self._tracer is not None:
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
trace_buffer.put(("DAG", "call_{}".format(data_id), True,
end_call - start_call))
else:
trace_buffer.put(("DAG", "call_{}".format(data_id), False,
end_call - start_call))
trace_buffer.put({
"name": "DAG",
"id": data_id,
"succ": resp_channeldata.ecode == ChannelDataEcode.OK.value,
"actions": {
"call_{}".format(data_id): end_call - start_call,
},
})
profile_str = self._profiler.gen_profile_str()
if self._server_use_profile:
......@@ -330,7 +335,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
if not self._is_thread_op:
self._manager = multiprocessing.Manager()
self._manager = PipelineProcSyncManager()
_LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op):
......@@ -554,6 +559,7 @@ class DAG(object):
self._pack_func = pack_func
self._unpack_func = unpack_func
if self._tracer is not None:
self._tracer.set_channels(self._channels)
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import logging
import logging.handlers
import logging.config
import os
......@@ -25,66 +25,52 @@ class SectionLevelFilter(object):
return logRecord.levelno in self._levels
class OutOfMouduleFilter(object):
def __init__(self, out_names):
self._out_names = out_names
def filter(self, logRecord):
return logRecord.name not in self._out_names
class OutOfMouduleAndSectionLevelFilter(object):
def __init__(self, out_names, levels):
self._out_names = out_names
self._levels = levels
def filter(self, logRecord):
if logRecord.name in self._out_names:
return False
return logRecord.levelno in self._levels
class StreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(StreamHandler, self).__init__(*args, **kwargs)
self.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
log_dir = "PipelineServingLogs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# root logger
_LOGGER = logging.getLogger()
_LOGGER.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")
# info and warn
file_info = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "INFO.log"))
file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING]))
file_info.setFormatter(formatter)
# err and critical
file_err = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "ERROR.log"))
file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_err.setLevel(logging.ERROR)
file_err.setFormatter(formatter)
_LOGGER.addHandler(file_info)
_LOGGER.addHandler(file_err)
# tracer logger
_TRACER = logging.getLogger("pipeline.profiler")
_TRACER.setLevel(logging.INFO)
_TRACER.addFilter(logging.Filter("pipeline.profiler"))
# tracer
tracer_formatter = logging.Formatter("%(asctime)s %(message)s")
file_trace = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "TRACE.log"))
file_trace.setFormatter(tracer_formatter)
_TRACER.addHandler(file_trace)
logger_config = {
"version": 1,
"formatters": {
"normal_fmt": {
"format":
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s",
},
"tracer_fmt": {
"format": "%(asctime)s %(message)s",
},
},
"handlers": {
"f_pipeline.log": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log"),
},
"f_pipeline.log.wf": {
"class": "logging.FileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
},
"f_tracer.log": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
},
},
"loggers": {
# propagate = True
".".join(__name__.split(".")[:-1] + ["profiler"]): {
"level": "INFO",
"handlers": ["f_tracer.log"],
},
},
"root": {
"level": "DEBUG",
"handlers": ["f_pipeline.log", "f_pipeline.log.wf"],
},
}
logging.config.dictConfig(logger_config)
......@@ -22,8 +22,15 @@ import logging
import func_timeout
import os
import sys
import collections
import numpy as np
from numpy import *
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
......@@ -32,7 +39,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler
_LOGGER = logging.getLogger("pipeline.operator")
_LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op")
......@@ -127,7 +134,7 @@ class Op(object):
fetch_names):
if self.with_serving == False:
_LOGGER.info("Op({}) has no client (and it also do not "
"run the process function".format(self.name))
"run the process function)".format(self.name))
return None
if client_type == 'brpc':
client = Client()
......@@ -199,7 +206,7 @@ class Op(object):
(_, input_dict), = input_dicts.items()
return input_dict
def process(self, feed_batch):
def process(self, feed_batch, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0:
_LOGGER.critical(
......@@ -207,7 +214,7 @@ class Op(object):
"preprocess func.".format(err_info)))
os._exit(-1)
call_result = self.client.predict(
feed=feed_batch, fetch=self._fetch_names)
feed=feed_batch, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
......@@ -294,8 +301,8 @@ class Op(object):
def _run_preprocess(self, parsed_data_dict, op_info_prefix):
_LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {}
err_channeldata_dict = {}
preped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None
try:
......@@ -326,68 +333,132 @@ class Op(object):
def _run_process(self, preped_data_dict, op_info_prefix):
_LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {}
err_channeldata_dict = {}
midped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
if self.with_serving:
data_ids = preped_data_dict.keys()
typical_logid = data_ids[0]
if len(data_ids) != 1:
for data_id in data_ids:
_LOGGER.info(
"(logid={}) {} During access to PaddleServingService,"
" we selected logid={} (from batch: {}) as a "
"representative for logging.".format(
data_id, op_info_prefix, typical_logid, data_ids))
# combine samples to batch
one_input = preped_data_dict[data_ids[0]]
feed_batch = []
input_offset = None
if isinstance(one_input, dict):
# sample input
feed_batch = [preped_data_dict[data_id] for data_id in data_ids]
input_offset = list(range(len(data_ids) + 1))
elif isinstance(one_input, list):
# batch input
input_offset = [0]
for data_id in data_ids:
batch_input = preped_data_dict[data_id]
offset = input_offset[-1] + len(batch_input)
feed_batch += batch_input
input_offset.append(offset)
else:
_LOGGER.critical(
"{} Failed to process: expect input type is dict(sample"
" input) or list(batch input), but get {}".format(
op_info_prefix, type(one_input)))
os._exit(-1)
midped_batch = None
ecode = ChannelDataEcode.OK.value
if self._timeout <= 0:
try:
midped_batch = self.process(feed_batch)
midped_batch = self.process(feed_batch, typical_logid)
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = "{} Failed to process(batch: {}): {}".format(
op_info_prefix, data_ids, e)
error_info = "(logid={}) {} Failed to process(batch: {}): {}".format(
typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True)
else:
for i in range(self._retry):
try:
midped_batch = func_timeout.func_timeout(
self._timeout, self.process, args=(feed_batch, ))
self._timeout,
self.process,
args=(feed_batch, typical_logid))
except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value
error_info = "{} Failed to process(batch: {}): " \
error_info = "(logid={}) {} Failed to process(batch: {}): " \
"exceeded retry count.".format(
op_info_prefix, data_ids)
typical_logid, op_info_prefix, data_ids)
_LOGGER.error(error_info)
else:
_LOGGER.warning(
"{} Failed to process(batch: {}): timeout, and retrying({}/{})"
.format(op_info_prefix, data_ids, i + 1,
self._retry))
"(logid={}) {} Failed to process(batch: {}): timeout,"
" and retrying({}/{})...".format(
typical_logid, op_info_prefix, data_ids, i +
1, self._retry))
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = "{} Failed to process(batch: {}): {}".format(
op_info_prefix, data_ids, e)
error_info = "(logid={}) {} Failed to process(batch: {}): {}".format(
typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True)
break
else:
break
if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None:
# op client return None
error_info = "{} Failed to predict, please check if PaddleServingService" \
" is working properly.".format(op_info_prefix)
error_info = "(logid={}) {} Failed to predict, please check if " \
"PaddleServingService is working properly.".format(
typical_logid, op_info_prefix)
_LOGGER.error(error_info)
for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value,
error_info=error_info,
data_id=data_id)
else:
# transform np format to dict format
var_names = midped_batch.keys()
lod_var_names = set()
lod_offset_names = set()
for name in var_names:
lod_offset_name = "{}.lod".format(name)
if lod_offset_name in var_names:
_LOGGER.debug("(logid={}) {} {} is LodTensor".format(
typical_logid, op_info_prefix, name))
lod_var_names.add(name)
lod_offset_names.add(lod_offset_name)
for idx, data_id in enumerate(data_ids):
midped_data_dict[data_id] = {
k: v[idx]
for k, v in midped_batch.items()
}
midped_data_dict[data_id] = {}
for name, value in midped_batch.items():
if name in lod_offset_names:
continue
if name in lod_var_names:
# lodtensor
lod_offset_name = "{}.lod".format(name)
lod_offset = midped_batch[lod_offset_name]
for idx, data_id in enumerate(data_ids):
data_offset_left = input_offset[idx]
data_offset_right = input_offset[idx + 1]
lod_offset_left = lod_offset[data_offset_left]
lod_offset_right = lod_offset[data_offset_right]
midped_data_dict[data_id][name] = value[lod_offset_left:lod_offset_right]
midped_data_dict[data_id][lod_offset_name] = \
lod_offset[data_offset_left:data_offset_right + 1] - lod_offset[data_offset_left]
else:
# normal tensor
for idx, data_id in enumerate(data_ids):
left = input_offset[idx]
right = input_offset[idx + 1]
midped_data_dict[data_id][name] = value[left:right]
else:
midped_data_dict = preped_data_dict
_LOGGER.debug("{} Succ process".format(op_info_prefix))
......@@ -396,8 +467,8 @@ class Op(object):
def _run_postprocess(self, parsed_data_dict, midped_data_dict,
op_info_prefix):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {}
err_channeldata_dict = {}
postped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
for data_id, midped_data in midped_data_dict.items():
postped_data, err_channeldata = None, None
try:
......@@ -476,7 +547,7 @@ class Op(object):
yield batch
def _parse_channeldata_batch(self, batch, output_channels):
parsed_data_dict = {}
parsed_data_dict = collections.OrderedDict()
need_profile_dict = {}
profile_dict = {}
for channeldata_dict in batch:
......@@ -520,6 +591,7 @@ class Op(object):
op_info_prefix=op_info_prefix)
start, end = None, None
trace_que = collections.deque()
while True:
start = int(round(_time() * 1000000))
try:
......@@ -529,8 +601,7 @@ class Op(object):
self._finalize(is_thread_op)
break
end = int(round(_time() * 1000000))
if trace_buffer is not None:
trace_buffer.put((self.name, "in", True, end - start))
in_time = end - start
# parse channeldata batch
try:
......@@ -550,8 +621,7 @@ class Op(object):
preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, op_info_prefix)
end = profiler.record("prep#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "prep", True, end - start))
prep_time = end - start
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
......@@ -563,7 +633,7 @@ class Op(object):
_LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
if len(parsed_data_dict) == 0:
if len(preped_data_dict) == 0:
continue
# process
......@@ -571,8 +641,7 @@ class Op(object):
midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix)
end = profiler.record("midp#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "midp", True, end - start))
midp_time = end - start
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
......@@ -593,12 +662,11 @@ class Op(object):
= self._run_postprocess(
parsed_data_dict, midped_data_dict, op_info_prefix)
end = profiler.record("postp#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "postp", True, end - start))
postp_time = end - start
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
data=error_channeldata,
data=err_channeldata,
channels=output_channels,
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
......@@ -627,8 +695,25 @@ class Op(object):
self._finalize(is_thread_op)
break
end = int(round(_time() * 1000000))
out_time = end - start
if trace_buffer is not None:
trace_buffer.put((self.name, "out", True, end - start))
trace_que.append({
"name": self.name,
"actions": {
"in": in_time,
"prep": prep_time,
"midp": midp_time,
"postp": postp_time,
"out": out_time,
}
})
while trace_que:
info = trace_que[0]
try:
trace_buffer.put_nowait(info)
trace_que.popleft()
except Queue.Full:
break
def _initialize(self, is_thread_op, client_type, concurrency_idx):
if is_thread_op:
......@@ -718,7 +803,7 @@ class ResponseOp(Op):
feed = channeldata.parse()
# ndarray to string:
# https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray
np.set_printoptions(threshold=np.nan)
np.set_printoptions(threshold=sys.maxsize)
for name, var in feed.items():
resp.value.append(var.__repr__())
resp.key.append(name)
......
......@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode
from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc
_LOGGER = logging.getLogger("pipeline.pipeline_client")
_LOGGER = logging.getLogger(__name__)
class PipelineClient(object):
......
......@@ -26,14 +26,14 @@ from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp
from .dag import DAGExecutor
_LOGGER = logging.getLogger("pipeline.pipeline_server")
_LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_conf):
def __init__(self, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__()
# init dag executor
self._dag_executor = DAGExecutor(response_op, dag_conf)
self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init")
......@@ -92,8 +92,9 @@ class PipelineServer(object):
json.dumps(
conf, indent=4, separators=(',', ':'))))
if self._build_dag_each_worker is True:
_LOGGER.info(
"(Make sure that install grpcio whl with --no-binary flag)")
_LOGGER.warning(
"(Make sure that install grpcio whl with --no-binary flag: "
"pip install grpcio --no-binary grpcio)")
_LOGGER.info("-------------------------------------------")
self._conf = conf
......@@ -107,27 +108,31 @@ class PipelineServer(object):
show_info = (i == 0)
worker = multiprocessing.Process(
target=self._run_server_func,
args=(bind_address, self._response_op, self._conf))
args=(bind_address, self._response_op, self._conf, i))
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
else:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._worker_num))
futures.ThreadPoolExecutor(max_workers=self._worker_num),
options=[('grpc.max_send_message_length', 256 * 1024 * 1024),
('grpc.max_receive_message_length', 256 * 1024 * 1024)])
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._conf), server)
server.add_insecure_port('[::]:{}'.format(self._port))
server.start()
server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf):
options = (('grpc.so_reuseport', 1), )
def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
options = [('grpc.so_reuseport', 1),
('grpc.max_send_message_length', 256 * 1024 * 1024),
('grpc.max_send_message_length', 256 * 1024 * 1024)]
server = grpc.server(
futures.ThreadPoolExecutor(
max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_conf), server)
PipelineServicer(response_op, dag_conf, worker_idx), server)
server.add_insecure_port(bind_address)
server.start()
server.wait_for_termination()
......@@ -177,7 +182,7 @@ class ServerYamlConfChecker(object):
@staticmethod
def check_tracer_conf(conf):
default_conf = {"interval_s": 600, }
default_conf = {"interval_s": -1, }
conf_type = {"interval_s": int, }
......
......@@ -27,7 +27,8 @@ import time
import threading
import multiprocessing
_TRACER = logging.getLogger("pipeline.profiler")
_LOGGER = logging.getLogger(__name__)
_LOGGER.propagate = False
class PerformanceTracer(object):
......@@ -67,23 +68,32 @@ class PerformanceTracer(object):
self._channels = channels
def _trace_func(self, channels):
actions = ["in", "prep", "midp", "postp", "out"]
all_actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"]
while True:
op_cost = {}
err_request = []
err_count = 0
_TRACER.info("==================== TRACER ======================")
_LOGGER.info("==================== TRACER ======================")
# op
while True:
try:
name, action, stage, cost = self._data_buffer.get_nowait()
if stage == False:
# only for name == DAG
assert name == "DAG"
item = self._data_buffer.get_nowait()
name = item["name"]
actions = item["actions"]
if name == "DAG":
succ = item["succ"]
req_id = item["id"]
if not succ:
err_count += 1
err_request.append(req_id)
if name not in op_cost:
op_cost[name] = {}
for action, cost in actions.items():
if action not in op_cost[name]:
op_cost[name][action] = []
op_cost[name][action].append(cost)
......@@ -98,15 +108,15 @@ class PerformanceTracer(object):
tot_cost += op_cost[name][action]
if name != "DAG":
_TRACER.info("Op({}):".format(name))
for action in actions:
_LOGGER.info("Op({}):".format(name))
for action in all_actions:
if action in op_cost[name]:
_TRACER.info("\t{}[{} ms]".format(
_LOGGER.info("\t{}[{} ms]".format(
action, op_cost[name][action]))
for action in calcu_actions:
if action in op_cost[name]:
calcu_cost += op_cost[name][action]
_TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
_LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if "DAG" in op_cost:
......@@ -116,21 +126,22 @@ class PerformanceTracer(object):
qps = 1.0 * tot / self._interval_s
ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99]
_TRACER.info("DAGExecutor:")
_TRACER.info("\tquery count[{}]".format(tot))
_TRACER.info("\tqps[{} q/s]".format(qps))
_TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot))
_TRACER.info("\tlatency:")
_TRACER.info("\t\tave[{} ms]".format(ave_cost))
_LOGGER.info("DAGExecutor:")
_LOGGER.info("\tQuery count[{}]".format(tot))
_LOGGER.info("\tQPS[{} q/s]".format(qps))
_LOGGER.info("\tSucc[{}]".format(1 - 1.0 * err_count / tot))
_LOGGER.info("\tError req[{}]".format(", ".join([str(x) for x in err_request)]))
_LOGGER.info("\tLatency:")
_LOGGER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys:
_TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int(
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
# channel
_TRACER.info("Channel (server worker num[{}]):".format(
_LOGGER.info("Channel (server worker num[{}]):".format(
self._server_worker_num))
for channel in channels:
_TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
_LOGGER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
channel.name,
channel.get_producers(),
channel.get_consumers(),
......
......@@ -13,13 +13,101 @@
# limitations under the License.
import sys
import logging
import threading
import multiprocessing
import multiprocessing.managers
if sys.version_info.major == 2:
import Queue
from Queue import PriorityQueue
elif sys.version_info.major == 3:
import queue as Queue
from queue import PriorityQueue
else:
raise Exception("Error Python version")
_LOGGER = logging.getLogger(__name__)
class NameGenerator(object):
# use unsafe-id-generator
def __init__(self, prefix):
self._idx = -1
self._prefix = prefix
self._id_generator = UnsafeIdGenerator(1000000000000000000)
def next(self):
next_id = self._id_generator.next()
return "{}{}".format(self._prefix, next_id)
class UnsafeIdGenerator(object):
def __init__(self, max_id, base_counter=0, step=1):
self._base_counter = base_counter
self._counter = self._base_counter
self._step = step
self._max_id = max_id # for reset
def next(self):
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ThreadIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = threading.Lock()
super(ThreadIdGenerator, self).__init__(max_id, base_counter, step)
def next(self):
self._idx += 1
return "{}{}".format(self._prefix, self._idx)
next_id = None
with self._lock:
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ProcessIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = multiprocessing.Lock()
self._base_counter = base_counter
self._counter = multiprocessing.Manager().Value('i', 0)
self._step = step
self._max_id = max_id
def next(self):
next_id = None
with self._lock:
if self._counter.value >= self._max_id:
self._counter.value = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter.value))
next_id = self._counter.value
self._counter.value += self._step
return next_id
def PipelineProcSyncManager():
"""
add PriorityQueue into SyncManager, see more:
https://stackoverflow.com/questions/25324560/strange-queue-priorityqueue-behaviour-with-multiprocessing-in-python-2-7-6?answertab=active#tab-top
"""
class PipelineManager(multiprocessing.managers.SyncManager):
pass
PipelineManager.register("PriorityQueue", PriorityQueue)
m = PipelineManager()
m.start()
return m
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册