op.cpp 8.2 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
guru4elephant 已提交
15
#include "core/predictor/op/op.h"
W
wangguibao 已提交
16 17 18

#ifdef BCLOUD
#include <base/time.h>  // base::Timer
W
wangguibao 已提交
19 20
#else
#include <butil/time.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23
#include <string>
G
guru4elephant 已提交
24 25 26 27
#include "core/predictor/common/constant.h"
#include "core/predictor/common/utils.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/dag.h"
W
wangguibao 已提交
28 29 30 31 32

namespace baidu {
namespace paddle_serving {
namespace predictor {

W
wangguibao 已提交
33 34 35 36 37
int Op::init(Bus* bus,
             Dag* dag,
             uint32_t id,
             const std::string& name,
             const std::string& type,
B
barriery 已提交
38
             void* conf,
39
             const std::vector<std::string>& address,
B
barriery 已提交
40
             const uint64_t log_id) {
W
wangguibao 已提交
41 42 43 44 45
  _bus = bus;
  _dag = dag;
  _id = id;
  _name = name;
  _type = type;
46
  _address = address;
W
wangguibao 已提交
47 48 49 50
  set_config(conf);

  _timer = butil::get_object<TimerFlow>();
  if (!_timer) {
B
barriery 已提交
51 52
    LOG(ERROR) << "(logid=" << log_id
               << ") Invalid timerflow in op:" << this->name();
W
wangguibao 已提交
53 54 55 56 57 58 59 60 61
    return -1;
  }

  _timer->init();
  _has_calc = false;
  _has_init = true;

  Channel* channel = mutable_channel();
  if (channel == NULL) {
B
barriery 已提交
62 63
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed mutable channel in op: " << this->id() << ", "
W
wangguibao 已提交
64 65 66 67
               << this->name() << "!";
    return -1;
  }

B
barrierye 已提交
68
  _pre_node_names.clear();
W
wangguibao 已提交
69
  return custom_init();
W
wangguibao 已提交
70 71 72
}

int Op::deinit() {
W
wangguibao 已提交
73 74 75
  if (_timer) {
    butil::return_object(_timer);
  }
W
wangguibao 已提交
76

W
wangguibao 已提交
77 78 79
  _bus = NULL;
  _dag = NULL;
  _timer = NULL;
W
wangguibao 已提交
80

W
wangguibao 已提交
81 82 83 84 85
  if (release_channel() != 0) {
    LOG(ERROR) << "Failed release channel in op:" << this->id() << ", "
               << this->name() << "!";
    return -1;
  }
W
wangguibao 已提交
86

W
wangguibao 已提交
87
  return custom_deinit();
W
wangguibao 已提交
88 89 90
}

int Op::check_time(const char* tag) {
W
wangguibao 已提交
91 92 93 94
  if (!_timer) {
    LOG(ERROR) << "Invalid timer in op";
    return -1;
  }
W
wangguibao 已提交
95

W
wangguibao 已提交
96 97 98 99
  if (!_timer->check(tag)) {
    LOG(ERROR) << "Failed check timer:" << tag;
    return -1;
  }
W
wangguibao 已提交
100

W
wangguibao 已提交
101
  return 0;
W
wangguibao 已提交
102 103
}

B
barriery 已提交
104
int Op::process(const uint64_t log_id, bool debug) {
W
wangguibao 已提交
105 106 107 108 109
  butil::Timer op_time(butil::Timer::STARTED);
  if (debug && _timer) {
    _timer->start();
  }
  if (!_has_init) {
B
barriery 已提交
110 111
    LOG(ERROR) << "(logid=" << log_id
               << ") Make sure op has been init before inference";
W
wangguibao 已提交
112 113 114
    return ERR_INTERNAL_FAILURE;
  }

115
  /*
W
wangguibao 已提交
116
  if (_has_calc) {
B
barriery 已提交
117 118
    LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
              << " already processed before";
W
wangguibao 已提交
119
    return ERR_OK;
W
wangguibao 已提交
120
  }
121
  */
W
wangguibao 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

  // 1. dependency inference
  /*
  DagNode* node = _dag->node_by_name(this->name());
  if (node == NULL) {
      LOG(ERROR) << "Failed get node of op:" << this->name();
      return -1;
  }
  boost::unordered_map<std::string, EdgeMode>& depends =
      node->depends;
  boost::unordered_map<std::string, EdgeMode>::iterator it;
  for (it = depends.begin(); it != depends.end(); it++) {
      Op* depend_op = view->find(it->first);
      if (depend_op->process() != 0) {
          LOG(WARNING) << "Op: " << _name << " processed failed!";
          return -1;
      }
  }*/
140

W
wangguibao 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153
  if (debug && _timer) {
    _timer->check("depend");
  }

  // 2. current inference
  if (inference() != 0) {
    return ERR_OP_INFER_FAILURE;
  }
  if (debug && _timer) {
    _timer->check("infer");
  }

  // 3. share output to bus
154 155 156 157
  if (!_has_calc) {
    Channel* channel = mutable_channel();
    channel->share_to_bus(_bus, log_id);
  }
W
wangguibao 已提交
158 159 160 161 162 163 164 165 166 167 168 169

  // 4. mark has calculated
  _has_calc = true;

  if (debug && _timer) {
    _timer->check("share");
    _timer->end();
  }

  op_time.stop();
  PredictorMetric::GetInstance()->update_latency_metric(
      OP_METRIC_PREFIX + full_name(), op_time.u_elapsed());
B
barriery 已提交
170 171
  LOG(INFO) << "(logid=" << log_id << ") " << name() << "_time=["
            << op_time.u_elapsed() << "]";
W
wangguibao 已提交
172
  return ERR_OK;
W
wangguibao 已提交
173 174 175
}

std::string Op::time_info() {
W
wangguibao 已提交
176 177 178 179 180
  if (_timer) {
    return _timer->info();
  } else {
    return "Invalid Timer!";
  }
W
wangguibao 已提交
181 182 183
}

bool Op::is_mutable(const std::string& op) {
W
wangguibao 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }

  return true;
W
wangguibao 已提交
202 203 204
}

bool Op::is_mutable(const std::string& op) const {
W
wangguibao 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }

  return true;
W
wangguibao 已提交
223 224 225
}

bool Op::is_readable(const std::string& op) {
W
wangguibao 已提交
226
  if (op == START_OP_NAME) {
W
wangguibao 已提交
227
    return true;
W
wangguibao 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }

  return true;
W
wangguibao 已提交
244 245 246
}

bool Op::is_readable(const std::string& op) const {
W
wangguibao 已提交
247
  if (op == START_OP_NAME) {
W
wangguibao 已提交
248
    return true;
W
wangguibao 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on "
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }

  return true;
W
wangguibao 已提交
265 266
}

W
wangguibao 已提交
267
// Get the Channel object of dependent OP
W
wangguibao 已提交
268
Channel* Op::mutable_depend_channel(const std::string& op) {
W
wangguibao 已提交
269 270 271 272 273 274 275
  if (!is_mutable(op)) {
    LOG(WARNING) << "Op: " << _name << " cannot mutable op: " << op << "!";
    return NULL;
  }

  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
W
wangguibao 已提交
276 277
}

W
wangguibao 已提交
278
// Get the Channel object of dependent OP
W
wangguibao 已提交
279
const Channel* Op::get_depend_channel(const std::string& op) const {
W
wangguibao 已提交
280 281 282 283 284 285 286 287
  // Get the `mode` attribute of dependent OP from dag
  if (!is_readable(op)) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on op: " << op << "!";
    return NULL;
  }

  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
W
wangguibao 已提交
288 289 290
}

google::protobuf::Message* Op::mutable_message() {
W
wangguibao 已提交
291
  return mutable_channel()->message();
W
wangguibao 已提交
292 293 294
}

const google::protobuf::Message* Op::get_message() const {
W
wangguibao 已提交
295
  return get_channel()->message();
W
wangguibao 已提交
296 297 298 299
}

bool Op::has_calc() { return _has_calc; }

W
wangguibao 已提交
300
const char* Op::name() const { return _name.c_str(); }
W
wangguibao 已提交
301

W
wangguibao 已提交
302
const std::string& Op::type() const { return _type; }
W
wangguibao 已提交
303

W
wangguibao 已提交
304
uint32_t Op::id() const { return _id; }
W
wangguibao 已提交
305 306

const std::string Op::debug_string() {
W
wangguibao 已提交
307 308 309 310 311 312
  const Channel* channel = get_channel();
  if (!channel) {
    LOG(ERROR) << "Invalid channel!";
    return "Invalid channel in OP";
  }
  return channel->debug_string();
W
wangguibao 已提交
313 314 315
}

const google::protobuf::Message* Op::get_request_message() {
W
wangguibao 已提交
316
  return _bus->channel_by_name(START_OP_NAME)->message();
W
wangguibao 已提交
317 318
}

W
wangguibao 已提交
319 320 321
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu