// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "core/predictor/op/op.h" #ifdef BCLOUD #include // base::Timer #else #include #endif #include #include "core/predictor/common/constant.h" #include "core/predictor/common/utils.h" #include "core/predictor/framework/channel.h" #include "core/predictor/framework/dag.h" namespace baidu { namespace paddle_serving { namespace predictor { int Op::init(Bus* bus, Dag* dag, uint32_t id, const std::string& name, const std::string& type, void* conf, const uint64_t log_id) { _bus = bus; _dag = dag; _id = id; _name = name; _type = type; set_config(conf); _timer = butil::get_object(); if (!_timer) { LOG(ERROR) << "(logid=" << log_id << ") Invalid timerflow in op:" << this->name(); return -1; } _timer->init(); _has_calc = false; _has_init = true; Channel* channel = mutable_channel(); if (channel == NULL) { LOG(ERROR) << "(logid=" << log_id << ") Failed mutable channel in op: " << this->id() << ", " << this->name() << "!"; return -1; } _pre_node_names.clear(); return custom_init(); } int Op::deinit() { if (_timer) { butil::return_object(_timer); } _bus = NULL; _dag = NULL; _timer = NULL; if (release_channel() != 0) { LOG(ERROR) << "Failed release channel in op:" << this->id() << ", " << this->name() << "!"; return -1; } return custom_deinit(); } int Op::check_time(const char* tag) { if (!_timer) { LOG(ERROR) << "Invalid timer in op"; return -1; } if (!_timer->check(tag)) { LOG(ERROR) << "Failed check timer:" << tag; return -1; } return 0; } int Op::process(const uint64_t log_id, bool debug) { butil::Timer op_time(butil::Timer::STARTED); if (debug && _timer) { _timer->start(); } if (!_has_init) { LOG(ERROR) << "(logid=" << log_id << ") Make sure op has been init before inference"; return ERR_INTERNAL_FAILURE; } if (_has_calc) { LOG(INFO) << "(logid=" << log_id << ") Op: " << _name << " already processed before"; return ERR_OK; } // 1. dependency inference /* DagNode* node = _dag->node_by_name(this->name()); if (node == NULL) { LOG(ERROR) << "Failed get node of op:" << this->name(); return -1; } boost::unordered_map& depends = node->depends; boost::unordered_map::iterator it; for (it = depends.begin(); it != depends.end(); it++) { Op* depend_op = view->find(it->first); if (depend_op->process() != 0) { LOG(WARNING) << "Op: " << _name << " processed failed!"; return -1; } }*/ if (debug && _timer) { _timer->check("depend"); } // 2. current inference if (inference() != 0) { return ERR_OP_INFER_FAILURE; } if (debug && _timer) { _timer->check("infer"); } // 3. share output to bus Channel* channel = mutable_channel(); channel->share_to_bus(_bus, log_id); // 4. mark has calculated _has_calc = true; if (debug && _timer) { _timer->check("share"); _timer->end(); } op_time.stop(); PredictorMetric::GetInstance()->update_latency_metric( OP_METRIC_PREFIX + full_name(), op_time.u_elapsed()); LOG(INFO) << "(logid=" << log_id << ") " << name() << "_time=[" << op_time.u_elapsed() << "]"; return ERR_OK; } std::string Op::time_info() { if (_timer) { return _timer->info(); } else { return "Invalid Timer!"; } } bool Op::is_mutable(const std::string& op) { if (op == START_OP_NAME) { return false; } DagNode* node = const_cast(_dag->node_by_name(_name)); if (node->depends.find(op) == node->depends.end()) { LOG(WARNING) << "op: " << _name << " doesnot depend on" << "op: " << op << "!"; return false; } if (node->depends[op] != RW) { LOG(WARNING) << "op: " << _name << " has no RW access" << "ot op: " << op << ", mode: " << node->depends[op] << ", please use get_argment() instead."; return false; } return true; } bool Op::is_mutable(const std::string& op) const { if (op == START_OP_NAME) { return false; } DagNode* node = const_cast(_dag->node_by_name(_name)); if (node->depends.find(op) == node->depends.end()) { LOG(WARNING) << "op: " << _name << " doesnot depend on" << "op: " << op << "!"; return false; } if (node->depends[op] != RW) { LOG(WARNING) << "op: " << _name << " has no RW access" << "ot op: " << op << ", mode: " << node->depends[op] << ", please use get_argment() instead."; return false; } return true; } bool Op::is_readable(const std::string& op) { if (op == START_OP_NAME) { return true; } DagNode* node = const_cast(_dag->node_by_name(_name)); if (node->depends.find(op) == node->depends.end()) { LOG(WARNING) << "op: " << _name << " doesnot depend on" << "op: " << op << "!"; return false; } if (node->depends[op] != RW && node->depends[op] != RO) { LOG(WARNING) << "op: " << _name << " has no RO access" << "ot op: " << op << ", mode: " << node->depends[op] << ", please check your configuration."; return false; } return true; } bool Op::is_readable(const std::string& op) const { if (op == START_OP_NAME) { return true; } DagNode* node = const_cast(_dag->node_by_name(_name)); if (node->depends.find(op) == node->depends.end()) { LOG(WARNING) << "op: " << _name << " doesnot depend on " << "op: " << op << "!"; return false; } if (node->depends[op] != RW && node->depends[op] != RO) { LOG(WARNING) << "op: " << _name << " has no RO access" << "ot op: " << op << ", mode: " << node->depends[op] << ", please check your configuration."; return false; } return true; } // Get the Channel object of dependent OP Channel* Op::mutable_depend_channel(const std::string& op) { if (!is_mutable(op)) { LOG(WARNING) << "Op: " << _name << " cannot mutable op: " << op << "!"; return NULL; } // Get the Channel object of dependent OP from bus return _bus->channel_by_name(op); } // Get the Channel object of dependent OP const Channel* Op::get_depend_channel(const std::string& op) const { // Get the `mode` attribute of dependent OP from dag if (!is_readable(op)) { LOG(WARNING) << "op: " << _name << " doesnot depend on op: " << op << "!"; return NULL; } // Get the Channel object of dependent OP from bus return _bus->channel_by_name(op); } google::protobuf::Message* Op::mutable_message() { return mutable_channel()->message(); } const google::protobuf::Message* Op::get_message() const { return get_channel()->message(); } bool Op::has_calc() { return _has_calc; } const char* Op::name() const { return _name.c_str(); } const std::string& Op::type() const { return _type; } uint32_t Op::id() const { return _id; } const std::string Op::debug_string() { const Channel* channel = get_channel(); if (!channel) { LOG(ERROR) << "Invalid channel!"; return "Invalid channel in OP"; } return channel->debug_string(); } const google::protobuf::Message* Op::get_request_message() { return _bus->channel_by_name(START_OP_NAME)->message(); } } // namespace predictor } // namespace paddle_serving } // namespace baidu