提交 a00acd0e 编写于 作者: B barriery

add logid into more Log

上级 51b7ee70
......@@ -37,9 +37,9 @@ int conf_check(const Request *req,
const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
int var_num = req->insts(0).tensor_array_size();
if (var_num != model_config->_feed_type.size()) {
VLOG(2) << "var num: " << var_num;
VLOG(2) << "model config var num: " << model_config->_feed_type.size();
LOG(ERROR) << "feed var number not match.";
LOG(ERROR) << "feed var number not match: model config["
<< model_config->_feed_type.size() << "] vs. actual[" << var_num
<< "]";
return -1;
}
......
......@@ -42,6 +42,9 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
const GeneralBlob *input_blob;
uint64_t log_id =
get_depend_argument<GeneralBlob>(pre_node_names[0])->GetLogId();
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
......@@ -52,15 +55,17 @@ int GeneralResponseOp::inference() {
// timeline.Start();
int64_t start = timeline.TimeStampUS();
VLOG(2) << "start to call load general model_conf op";
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "get resource pointer done.";
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
VLOG(2) << "(logid=" << log_id
<< ") max body size : " << brpc::fLU64::FLAGS_max_body_size;
std::vector<int> fetch_index;
fetch_index.resize(req->fetch_var_names_size());
......@@ -69,17 +74,15 @@ int GeneralResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
}
const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
VLOG(2) << "(logid=" << log_id << ") pre names[" << pi << "]: " << pre_name
<< " (" << pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
uint64_t curr_logid = input_blob->GetLogId();
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "(logid=" << curr_logid
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op: " << pre_name;
return -1;
}
......@@ -94,18 +97,18 @@ int GeneralResponseOp::inference() {
for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array();
if (model_config->_is_lod_fetch[idx]) {
VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is lod_tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
VLOG(2) << "(logid=" << log_id << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
} else {
VLOG(2) << "(logid=" << curr_logid << ") out[" << idx << "] "
VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "(logid=" << curr_logid << ") shape[" << k
VLOG(2) << "(logid=" << log_id << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
......@@ -123,7 +126,7 @@ int GeneralResponseOp::inference() {
auto dtype = in->at(idx).dtype;
if (dtype == paddle::PaddleDType::INT64) {
VLOG(2) << "(logid=" << curr_logid << ") Prepare int64 var ["
VLOG(2) << "(logid=" << log_id << ") Prepare int64 var ["
<< model_config->_fetch_name[idx] << "].";
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
// from
......@@ -134,7 +137,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::FLOAT32) {
VLOG(2) << "(logid=" << curr_logid << ") Prepare float var ["
VLOG(2) << "(logid=" << log_id << ") Prepare float var ["
<< model_config->_fetch_name[idx] << "].";
float *data_ptr = static_cast<float *>(in->at(idx).data.data());
google::protobuf::RepeatedField<float> tmp_data(data_ptr,
......@@ -142,7 +145,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::INT32) {
VLOG(2) << "(logid=" << curr_logid << ")Prepare int32 var ["
VLOG(2) << "(logid=" << log_id << ")Prepare int32 var ["
<< model_config->_fetch_name[idx] << "].";
int32_t *data_ptr = static_cast<int32_t *>(in->at(idx).data.data());
google::protobuf::RepeatedField<int32_t> tmp_data(data_ptr,
......@@ -158,7 +161,7 @@ int GeneralResponseOp::inference() {
}
}
VLOG(2) << "(logid=" << curr_logid << ") fetch var ["
VLOG(2) << "(logid=" << log_id << ") fetch var ["
<< model_config->_fetch_name[idx] << "] ready";
var_idx++;
}
......@@ -172,8 +175,7 @@ int GeneralResponseOp::inference() {
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
uint64_t curr_logid = input_blob->GetLogId();
VLOG(2) << "(logid=" << curr_logid
VLOG(2) << "(logid=" << log_id
<< ") p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
if (pi == 0) {
......
......@@ -40,6 +40,9 @@ int GeneralTextResponseOp::inference() {
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
const GeneralBlob *input_blob;
uint64_t log_id =
get_depend_argument<GeneralBlob>(pre_node_names[0])->GetLogId();
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
......@@ -48,11 +51,12 @@ int GeneralTextResponseOp::inference() {
Timer timeline;
int64_t start = timeline.TimeStampUS();
VLOG(2) << "start to call load general model_conf op";
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "get resource pointer done.";
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
......@@ -63,18 +67,17 @@ int GeneralTextResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
}
const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
VLOG(2) << "(logid=" << log_id << ") pre names[" << pi << "]: " << pre_name
<< " (" << pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op: " << pre_name;
return -1;
}
uint64_t log_id = input_blob->GetLogId();
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
......@@ -139,7 +142,6 @@ int GeneralTextResponseOp::inference() {
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id
<< ") p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
......
......@@ -72,9 +72,10 @@ class Channel {
const std::string& op() { return _op; }
int share_to_bus(Bus* bus) {
int share_to_bus(Bus* bus, const uint64_t log_id) {
if (bus->regist(_op, this) != 0) {
LOG(ERROR) << "Failed regist channel[" << _op << "] to bus!";
LOG(ERROR) << "(logid=" << log_id << ") Failed regist channel[" << _op
<< "] to bus!";
return -1;
}
......
......@@ -155,13 +155,11 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
}
if (FLAGS_el_log_level == 16) {
LOG(INFO) << "DAG: " << _dag_name;
LOG(INFO) << ", Op Num: " << _index_nodes.size();
LOG(INFO) << "DAG: " << _dag_name << ", Op Num: " << _index_nodes.size();
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(INFO) << ", OP-" << node->id << "-" << node->name << "-"
<< node->type;
LOG(INFO) << " depends: " << node->depends.size();
LOG(INFO) << "OP-" << node->id << "-" << node->name << "-" << node->type
<< " depends: " << node->depends.size();
boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
......@@ -214,8 +212,8 @@ int Dag::topo_sort() {
}
}
for (int i = 0; i < in_degree.size(); ++i) {
LOG(INFO) << "(" << _index_nodes[i]->name << ") in_degree[" << i
<< "]: " << in_degree[i];
VLOG(2) << "(" << _index_nodes[i]->name << ") in_degree[" << i
<< "]: " << in_degree[i];
}
int sorted_num = 0;
DagStage* stage = new (std::nothrow) DagStage();
......
......@@ -26,7 +26,9 @@ namespace baidu {
namespace paddle_serving {
namespace predictor {
int DagView::init(Dag* dag, const std::string& service_name) {
int DagView::init(Dag* dag,
const std::string& service_name,
const uint64_t log_id) {
_name = dag->name();
_full_name = service_name + NAME_DELIMITER + dag->name();
_bus = butil::get_object<Bus>();
......@@ -36,17 +38,20 @@ int DagView::init(Dag* dag, const std::string& service_name) {
for (uint32_t si = 0; si < stage_size; si++) {
const DagStage* stage = dag->stage_by_index(si);
if (stage == NULL) {
LOG(ERROR) << "Failed get stage by index:" << si;
LOG(ERROR) << "(logid=" << log_id << ") Failed get stage by index:" << si;
return ERR_INTERNAL_FAILURE;
}
ViewStage* vstage = butil::get_object<ViewStage>();
if (vstage == NULL) {
LOG(ERROR) << "Failed get vstage from object pool"
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get vstage from object pool"
<< "at:" << si;
return ERR_MEM_ALLOC_FAILURE;
}
VLOG(2) << "stage[" << si << "] name: " << stage->full_name;
VLOG(2) << "stage[" << si << "] node size: " << stage->nodes.size();
VLOG(2) << "(logid=" << log_id << ") stage[" << si
<< "] name: " << stage->full_name;
VLOG(2) << "(logid=" << log_id << ") stage[" << si
<< "] node size: " << stage->nodes.size();
vstage->full_name = service_name + NAME_DELIMITER + stage->full_name;
uint32_t node_size = stage->nodes.size();
// create tls view node
......@@ -54,31 +59,39 @@ int DagView::init(Dag* dag, const std::string& service_name) {
DagNode* node = stage->nodes[ni];
ViewNode* vnode = butil::get_object<ViewNode>();
if (vnode == NULL) {
LOG(ERROR) << "Failed get vnode at:" << ni;
LOG(ERROR) << "(logid=" << log_id << ") Failed get vnode at:" << ni;
return ERR_MEM_ALLOC_FAILURE;
}
// factory type
Op* op = OpRepository::instance().get_op(node->type);
if (op == NULL) {
LOG(ERROR) << "Failed get op with type:" << node->type;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op with type:" << node->type;
return ERR_INTERNAL_FAILURE;
}
// initialize a TLS op object
VLOG(2) << "dag view initialized: \n"
VLOG(2) << "(logid=" << log_id << ") dag view initialized: \n"
<< "node id: " << node->id << "\n"
<< "node name: " << node->name << "\n"
<< "node type: " << node->type;
if (op->init(_bus, dag, node->id, node->name, node->type, node->conf) !=
0) {
LOG(WARNING) << "Failed init op, type:" << node->type;
if (op->init(_bus,
dag,
node->id,
node->name,
node->type,
node->conf,
log_id) != 0) {
LOG(WARNING) << "(logid=" << log_id
<< ") Failed init op, type:" << node->type;
return ERR_INTERNAL_FAILURE;
}
op->set_full_name(service_name + NAME_DELIMITER + node->full_name);
// Set the name of the Op as the key of the matching engine.
VLOG(2) << "op->set_engine_name(" << node->name.c_str() << ")";
VLOG(2) << "(logid=" << log_id << ") op->set_engine_name("
<< node->name.c_str() << ")";
op->set_engine_name(node->name);
vnode->conf = node;
......@@ -88,7 +101,7 @@ int DagView::init(Dag* dag, const std::string& service_name) {
it != vnode->conf->depends.end();
++it) {
std::string pre_node_name = it->first;
VLOG(2) << "add op pre name: \n"
VLOG(2) << "(logid=" << log_id << ") add op pre name: \n"
<< "current op name: " << vnode->op->op_name()
<< ", previous op name: " << pre_node_name;
vnode->op->add_pre_node_name(pre_node_name);
......@@ -102,7 +115,7 @@ int DagView::init(Dag* dag, const std::string& service_name) {
//<< " previous op name: "
//<< _view[si - 1]->nodes.back()->op->op_name();
// vstage->nodes.back()->op->set_pre_node_name(
//_view[si - 1]->nodes.back()->op->op_name());
// _view[si - 1]->nodes.back()->op->op_name());
/*}*/
_view.push_back(vstage);
}
......@@ -133,14 +146,15 @@ int DagView::deinit() {
return ERR_OK;
}
int DagView::execute(butil::IOBufBuilder* debug_os) {
int DagView::execute(const uint64_t log_id, butil::IOBufBuilder* debug_os) {
uint32_t stage_size = _view.size();
for (uint32_t si = 0; si < stage_size; si++) {
TRACEPRINTF("start to execute stage[%u]", si);
int errcode = execute_one_stage(_view[si], debug_os);
TRACEPRINTF("finish to execute stage[%u]", si);
TRACEPRINTF("(logid=%" PRIu64 ") start to execute stage[%u]", log_id, si);
int errcode = execute_one_stage(_view[si], log_id, debug_os);
TRACEPRINTF("(logid=%" PRIu64 ") finish to execute stage[%u]", log_id, si);
if (errcode < 0) {
LOG(ERROR) << "failed execute stage[" << _view[si]->debug();
LOG(ERROR) << "(logid=" << log_id << ") Failed execute stage["
<< _view[si]->debug();
return errcode;
}
}
......@@ -151,29 +165,34 @@ int DagView::execute(butil::IOBufBuilder* debug_os) {
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
int DagView::execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
VLOG(2) << "vstage->nodes.size(): " << node_size;
VLOG(2) << "(logid=" << log_id << ") vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
Op* op = vnode->op;
TRACEPRINTF("start to execute op[%s]", op->name());
int errcode = op->process(debug_os != NULL);
TRACEPRINTF("finish to execute op[%s]", op->name());
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute op[%s]", log_id, op->name());
int errcode = op->process(log_id, debug_os != NULL);
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute op[%s]", log_id, op->name());
if (errcode < 0) {
LOG(ERROR) << "Execute failed, Op:" << op->debug_string();
LOG(ERROR) << "(logid=" << log_id
<< ") Execute failed, Op:" << op->debug_string();
return errcode;
}
if (errcode > 0) {
LOG(INFO) << "Execute ignore, Op:" << op->debug_string();
LOG(INFO) << "(logid=" << log_id
<< ") Execute ignore, Op:" << op->debug_string();
continue;
}
if (debug_os) {
(*debug_os) << "{\"op_name\": \"" << op->name()
(*debug_os) << "(logid=" << log_id << ") {\"op_name\": \"" << op->name()
<< "\", \"debug_str:\": \"" << op->debug_string()
<< "\", \"time_info\": \"" << op->time_info() << "\"}";
}
......@@ -186,34 +205,34 @@ int DagView::execute_one_stage(ViewStage* vstage,
return ERR_OK;
}
int DagView::set_request_channel(Channel& request) {
int DagView::set_request_channel(Channel& request, const uint64_t log_id) {
// Each workflow should get the very beginning
// request (channel), and commit it to bus, for
// the first stage ops consuming.
request.share_to_bus(_bus);
request.share_to_bus(_bus, log_id);
return ERR_OK;
}
const Channel* DagView::get_response_channel() const {
const Channel* DagView::get_response_channel(const uint64_t log_id) const {
// Caller obtains response channel from bus, and
// writes it to rpc response(protbuf/json)
if (_view.size() < 1) {
LOG(ERROR) << "invalid empty view stage!";
LOG(ERROR) << "(logid=" << log_id << ") invalid empty view stage!";
return NULL;
}
ViewStage* last_stage = _view[_view.size() - 1];
if (last_stage->nodes.size() != 1 || last_stage->nodes[0] == NULL) {
LOG(ERROR) << "Invalid last stage, size[" << last_stage->nodes.size()
<< "] != 1";
LOG(ERROR) << "(logid=" << log_id << ") Invalid last stage, size["
<< last_stage->nodes.size() << "] != 1";
return NULL;
}
Op* last_op = last_stage->nodes[0]->op;
if (last_op == NULL) {
LOG(ERROR) << "Last op is NULL";
LOG(ERROR) << "(logid=" << log_id << ") Last op is NULL";
return NULL;
}
return last_op->mutable_channel();
......
......@@ -47,21 +47,22 @@ class DagView {
~DagView() {}
int init(Dag* dag, const std::string& service_name);
int init(Dag* dag, const std::string& service_name, const uint64_t log_id);
int deinit();
int execute(butil::IOBufBuilder* debug_os);
int execute(const uint64_t log_id, butil::IOBufBuilder* debug_os);
// The default execution strategy is in sequencing
// You can derive a subclass to implement this func.
// ParallelDagView maybe the one you want.
virtual int execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
int set_request_channel(Channel& request); // NOLINT
int set_request_channel(Channel& request, const uint64_t log_id); // NOLINT
const Channel* get_response_channel() const;
const Channel* get_response_channel(const uint64_t log_id) const;
const std::string& name() const { return _name; }
......
......@@ -149,8 +149,9 @@ int InferService::inference(const google::protobuf::Message* request,
TRACEPRINTF("(logid=%" PRIu64 ") finish to thread clear", log_id);
if (_enable_map_request_to_workflow) {
LOG(INFO) << "(logid=" << log_id << ") enable map request == True";
std::vector<Workflow*>* workflows = _map_request_to_workflow(request);
VLOG(2) << "(logid=" << log_id << ") enable map request == True";
std::vector<Workflow*>* workflows =
_map_request_to_workflow(request, log_id);
if (!workflows || workflows->size() == 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed to map request to workflow";
......@@ -167,7 +168,8 @@ int InferService::inference(const google::protobuf::Message* request,
TRACEPRINTF("(logid=%" PRIu64 ") start to execute workflow[%s]",
log_id,
workflow->name().c_str());
int errcode = _execute_workflow(workflow, request, response, debug_os);
int errcode =
_execute_workflow(workflow, request, response, log_id, debug_os);
TRACEPRINTF("(logid=%" PRIu64 ") finish to execute workflow[%s]",
log_id,
workflow->name().c_str());
......@@ -178,13 +180,14 @@ int InferService::inference(const google::protobuf::Message* request,
}
}
} else {
LOG(INFO) << "(logid=" << log_id << ") enable map request == False";
VLOG(2) << "(logid=" << log_id << ") enable map request == False";
TRACEPRINTF("(logid=%" PRIu64 ") start to execute one workflow", log_id);
size_t fsize = _flows.size();
for (size_t fi = 0; fi < fsize; ++fi) {
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute one workflow-%lu", log_id, fi);
int errcode = execute_one_workflow(fi, request, response, debug_os);
int errcode =
execute_one_workflow(fi, request, response, log_id, debug_os);
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute one workflow-%lu", log_id, fi);
if (errcode < 0) {
......@@ -207,19 +210,22 @@ int InferService::debug(const google::protobuf::Message* request,
int InferService::execute_one_workflow(uint32_t index,
const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
if (index >= _flows.size()) {
LOG(ERROR) << "Faield execute workflow, index: " << index
LOG(ERROR) << "(logid=" << log_id
<< ") Faield execute workflow, index: " << index
<< " >= max:" << _flows.size();
return ERR_OVERFLOW_FAILURE;
}
Workflow* workflow = _flows[index];
return _execute_workflow(workflow, request, response, debug_os);
return _execute_workflow(workflow, request, response, log_id, debug_os);
}
int InferService::_execute_workflow(Workflow* workflow,
const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
butil::Timer workflow_time(butil::Timer::STARTED);
// create and submit beginer channel
......@@ -227,54 +233,62 @@ int InferService::_execute_workflow(Workflow* workflow,
req_channel.init(0, START_OP_NAME);
req_channel = request;
DagView* dv = workflow->fetch_dag_view(full_name());
dv->set_request_channel(req_channel);
DagView* dv = workflow->fetch_dag_view(full_name(), log_id);
dv->set_request_channel(req_channel, log_id);
// call actual inference interface
int errcode = dv->execute(debug_os);
int errcode = dv->execute(log_id, debug_os);
if (errcode < 0) {
LOG(ERROR) << "Failed execute dag for workflow:" << workflow->name();
LOG(ERROR) << "(logid=" << log_id
<< ") Failed execute dag for workflow:" << workflow->name();
return errcode;
}
TRACEPRINTF("finish to dv execute");
TRACEPRINTF("(logid=%" PRIu64 ") finish to dv execute", log_id);
// create ender channel and copy
const Channel* res_channel = dv->get_response_channel();
const Channel* res_channel = dv->get_response_channel(log_id);
if (res_channel == NULL) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get response channel";
return ERR_INTERNAL_FAILURE;
}
if (!_merger || !_merger->merge(res_channel->message(), response)) {
LOG(ERROR) << "Failed merge channel res to response";
LOG(ERROR) << "(logid=" << log_id
<< ") Failed merge channel res to response";
return ERR_INTERNAL_FAILURE;
}
TRACEPRINTF("finish to copy from");
TRACEPRINTF("(logid=%" PRIu64 ") finish to copy from", log_id);
workflow_time.stop();
LOG(INFO) << "workflow total time: " << workflow_time.u_elapsed();
LOG(INFO) << "(logid=" << log_id
<< ") workflow total time: " << workflow_time.u_elapsed();
PredictorMetric::GetInstance()->update_latency_metric(
WORKFLOW_METRIC_PREFIX + dv->full_name(), workflow_time.u_elapsed());
// return tls data to object pool
workflow->return_dag_view(dv);
TRACEPRINTF("finish to return dag view");
TRACEPRINTF("(logid=%" PRIu64 ") finish to return dag view", log_id);
return ERR_OK;
}
std::vector<Workflow*>* InferService::_map_request_to_workflow(
const google::protobuf::Message* request) {
const google::protobuf::Message* request, const uint64_t log_id) {
const google::protobuf::Descriptor* desc = request->GetDescriptor();
const google::protobuf::FieldDescriptor* field =
desc->FindFieldByName(_request_field_key);
if (field == NULL) {
LOG(ERROR) << "No field[" << _request_field_key << "] in ["
<< desc->full_name() << "].";
LOG(ERROR) << "(logid=" << log_id << ") No field[" << _request_field_key
<< "] in [" << desc->full_name() << "].";
return NULL;
}
if (field->is_repeated()) {
LOG(ERROR) << "field[" << desc->full_name() << "." << _request_field_key
<< "] is repeated.";
LOG(ERROR) << "(logid=" << log_id << ") field[" << desc->full_name() << "."
<< _request_field_key << "] is repeated.";
return NULL;
}
if (field->cpp_type() != google::protobuf::FieldDescriptor::CPPTYPE_STRING) {
LOG(ERROR) << "field[" << desc->full_name() << "." << _request_field_key
<< "] should be string";
LOG(ERROR) << "(logid=" << log_id << ") field[" << desc->full_name() << "."
<< _request_field_key << "] should be string";
return NULL;
}
const std::string& field_value =
......@@ -282,7 +296,7 @@ std::vector<Workflow*>* InferService::_map_request_to_workflow(
std::vector<Workflow*>* p_workflow =
_request_to_workflow_map.seek(field_value);
if (p_workflow == NULL) {
LOG(ERROR) << "cannot find key[" << field_value
LOG(ERROR) << "(logid=" << log_id << ") cannot find key[" << field_value
<< "] in _request_to_workflow_map";
return NULL;
}
......
......@@ -63,16 +63,18 @@ class InferService {
int execute_one_workflow(uint32_t index,
const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
private:
int _execute_workflow(Workflow* workflow,
const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
std::vector<Workflow*>* _map_request_to_workflow(
const google::protobuf::Message* request);
const google::protobuf::Message* request, const uint64_t log_id);
private:
std::vector<Workflow*> _flows;
......
......@@ -32,21 +32,22 @@ int Workflow::init(const configure::Workflow& conf) {
return 0;
}
DagView* Workflow::fetch_dag_view(const std::string& service_name) {
DagView* Workflow::fetch_dag_view(const std::string& service_name,
const uint64_t log_id) {
DagView* view = NULL;
if (_type == "Sequence") {
view = butil::get_object<DagView>();
} else if (_type == "Parallel") {
view = butil::get_object<ParallelDagView>();
} else {
LOG(ERROR) << "Unknown dag type:" << _type << "!";
LOG(ERROR) << "(logid=" << log_id << ") Unknown dag type:" << _type << "!";
return NULL;
}
if (view == NULL) {
LOG(ERROR) << "create dag view from pool failed!";
LOG(ERROR) << "(logid=" << log_id << ") create dag view from pool failed!";
return NULL;
}
view->init(&_dag, service_name);
view->init(&_dag, service_name, log_id);
return view;
}
......
......@@ -36,7 +36,8 @@ class Workflow {
// different apps.
int init(const configure::Workflow& conf);
DagView* fetch_dag_view(const std::string& service_name);
DagView* fetch_dag_view(const std::string& service_name,
const uint64_t log_id);
int deinit() { return 0; }
......
......@@ -35,7 +35,8 @@ int Op::init(Bus* bus,
uint32_t id,
const std::string& name,
const std::string& type,
void* conf) {
void* conf,
const uint64_t log_id) {
_bus = bus;
_dag = dag;
_id = id;
......@@ -45,7 +46,8 @@ int Op::init(Bus* bus,
_timer = butil::get_object<TimerFlow>();
if (!_timer) {
LOG(ERROR) << "Invalid timerflow in op:" << this->name();
LOG(ERROR) << "(logid=" << log_id
<< ") Invalid timerflow in op:" << this->name();
return -1;
}
......@@ -55,7 +57,8 @@ int Op::init(Bus* bus,
Channel* channel = mutable_channel();
if (channel == NULL) {
LOG(ERROR) << "Failed mutable channel in op: " << this->id() << ", "
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable channel in op: " << this->id() << ", "
<< this->name() << "!";
return -1;
}
......@@ -96,18 +99,20 @@ int Op::check_time(const char* tag) {
return 0;
}
int Op::process(bool debug) {
int Op::process(const uint64_t log_id, bool debug) {
butil::Timer op_time(butil::Timer::STARTED);
if (debug && _timer) {
_timer->start();
}
if (!_has_init) {
LOG(ERROR) << "Make sure op has been init before inference";
LOG(ERROR) << "(logid=" << log_id
<< ") Make sure op has been init before inference";
return ERR_INTERNAL_FAILURE;
}
if (_has_calc) {
LOG(INFO) << "Op: " << _name << " already processed before";
LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
<< " already processed before";
return ERR_OK;
}
......@@ -143,7 +148,7 @@ int Op::process(bool debug) {
// 3. share output to bus
Channel* channel = mutable_channel();
channel->share_to_bus(_bus);
channel->share_to_bus(_bus, log_id);
// 4. mark has calculated
_has_calc = true;
......@@ -156,7 +161,8 @@ int Op::process(bool debug) {
op_time.stop();
PredictorMetric::GetInstance()->update_latency_metric(
OP_METRIC_PREFIX + full_name(), op_time.u_elapsed());
LOG(INFO) << " " << name() << "_time=[" << op_time.u_elapsed() << "]";
LOG(INFO) << "(logid=" << log_id << ") " << name() << "_time=["
<< op_time.u_elapsed() << "]";
return ERR_OK;
}
......
......@@ -113,13 +113,14 @@ class Op {
uint32_t id,
const std::string& name,
const std::string& type,
void* conf);
void* conf,
const uint64_t log_id);
int deinit();
int check_time(const char* tag);
int process(bool debug);
int process(const uint64_t log_id, bool debug);
std::string time_info();
......
......@@ -347,6 +347,11 @@ class Client(object):
result_map[name] = result_batch_handle.get_int64_by_name(
mi, name)
shape = result_batch_handle.get_shape(mi, name)
if result_map[name].size == 0:
raise ValueError(
"Failed to fetch, maybe the type of [{}]"
" is wrong, please check the model file".format(
name))
result_map[name].shape = shape
if name in self.lod_tensor_set:
result_map["{}.lod".format(
......@@ -354,6 +359,11 @@ class Client(object):
elif self.fetch_names_to_type_[name] == float32_type:
result_map[name] = result_batch_handle.get_float_by_name(
mi, name)
if result_map[name].size == 0:
raise ValueError(
"Failed to fetch, maybe the type of [{}]"
" is wrong, please check the model file".format(
name))
shape = result_batch_handle.get_shape(mi, name)
result_map[name].shape = shape
if name in self.lod_tensor_set:
......@@ -364,6 +374,11 @@ class Client(object):
# result_map[name] will be py::array(numpy array)
result_map[name] = result_batch_handle.get_int32_by_name(
mi, name)
if result_map[name].size == 0:
raise ValueError(
"Failed to fetch, maybe the type of [{}]"
" is wrong, please check the model file".format(
name))
shape = result_batch_handle.get_shape(mi, name)
result_map[name].shape = shape
if name in self.lod_tensor_set:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册