op.cpp 8.0 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
guru4elephant 已提交
15
#include "core/predictor/op/op.h"
W
wangguibao 已提交
16 17 18

#ifdef BCLOUD
#include <base/time.h>  // base::Timer
W
wangguibao 已提交
19 20
#else
#include <butil/time.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23
#include <string>
G
guru4elephant 已提交
24 25 26 27
#include "core/predictor/common/constant.h"
#include "core/predictor/common/utils.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/dag.h"
W
wangguibao 已提交
28 29 30 31 32

namespace baidu {
namespace paddle_serving {
namespace predictor {

W
wangguibao 已提交
33 34 35 36 37
int Op::init(Bus* bus,
             Dag* dag,
             uint32_t id,
             const std::string& name,
             const std::string& type,
B
barriery 已提交
38 39
             void* conf,
             const uint64_t log_id) {
W
wangguibao 已提交
40 41 42 43 44 45 46 47 48
  _bus = bus;
  _dag = dag;
  _id = id;
  _name = name;
  _type = type;
  set_config(conf);

  _timer = butil::get_object<TimerFlow>();
  if (!_timer) {
B
barriery 已提交
49 50
    LOG(ERROR) << "(logid=" << log_id
               << ") Invalid timerflow in op:" << this->name();
W
wangguibao 已提交
51 52 53 54 55 56 57 58 59
    return -1;
  }

  _timer->init();
  _has_calc = false;
  _has_init = true;

  Channel* channel = mutable_channel();
  if (channel == NULL) {
B
barriery 已提交
60 61
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed mutable channel in op: " << this->id() << ", "
W
wangguibao 已提交
62 63 64 65
               << this->name() << "!";
    return -1;
  }

B
barrierye 已提交
66
  _pre_node_names.clear();
W
wangguibao 已提交
67
  return custom_init();
W
wangguibao 已提交
68 69 70
}

int Op::deinit() {
W
wangguibao 已提交
71 72 73
  if (_timer) {
    butil::return_object(_timer);
  }
W
wangguibao 已提交
74

W
wangguibao 已提交
75 76 77
  _bus = NULL;
  _dag = NULL;
  _timer = NULL;
W
wangguibao 已提交
78

W
wangguibao 已提交
79 80 81 82 83
  if (release_channel() != 0) {
    LOG(ERROR) << "Failed release channel in op:" << this->id() << ", "
               << this->name() << "!";
    return -1;
  }
W
wangguibao 已提交
84

W
wangguibao 已提交
85
  return custom_deinit();
W
wangguibao 已提交
86 87 88
}

int Op::check_time(const char* tag) {
W
wangguibao 已提交
89 90 91 92
  if (!_timer) {
    LOG(ERROR) << "Invalid timer in op";
    return -1;
  }
W
wangguibao 已提交
93

W
wangguibao 已提交
94 95 96 97
  if (!_timer->check(tag)) {
    LOG(ERROR) << "Failed check timer:" << tag;
    return -1;
  }
W
wangguibao 已提交
98

W
wangguibao 已提交
99
  return 0;
W
wangguibao 已提交
100 101
}

B
barriery 已提交
102
int Op::process(const uint64_t log_id, bool debug) {
W
wangguibao 已提交
103 104 105 106 107
  butil::Timer op_time(butil::Timer::STARTED);
  if (debug && _timer) {
    _timer->start();
  }
  if (!_has_init) {
B
barriery 已提交
108 109
    LOG(ERROR) << "(logid=" << log_id
               << ") Make sure op has been init before inference";
W
wangguibao 已提交
110 111 112 113
    return ERR_INTERNAL_FAILURE;
  }

  if (_has_calc) {
B
barriery 已提交
114 115
    LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
              << " already processed before";
W
wangguibao 已提交
116
    return ERR_OK;
W
wangguibao 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  }

  // 1. dependency inference
  /*
  DagNode* node = _dag->node_by_name(this->name());
  if (node == NULL) {
      LOG(ERROR) << "Failed get node of op:" << this->name();
      return -1;
  }
  boost::unordered_map<std::string, EdgeMode>& depends =
      node->depends;
  boost::unordered_map<std::string, EdgeMode>::iterator it;
  for (it = depends.begin(); it != depends.end(); it++) {
      Op* depend_op = view->find(it->first);
      if (depend_op->process() != 0) {
          LOG(WARNING) << "Op: " << _name << " processed failed!";
          return -1;
      }
  }*/
136

W
wangguibao 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150
  if (debug && _timer) {
    _timer->check("depend");
  }

  // 2. current inference
  if (inference() != 0) {
    return ERR_OP_INFER_FAILURE;
  }
  if (debug && _timer) {
    _timer->check("infer");
  }

  // 3. share output to bus
  Channel* channel = mutable_channel();
B
barriery 已提交
151
  channel->share_to_bus(_bus, log_id);
W
wangguibao 已提交
152 153 154 155 156 157 158 159 160 161 162 163

  // 4. mark has calculated
  _has_calc = true;

  if (debug && _timer) {
    _timer->check("share");
    _timer->end();
  }

  op_time.stop();
  PredictorMetric::GetInstance()->update_latency_metric(
      OP_METRIC_PREFIX + full_name(), op_time.u_elapsed());
B
barriery 已提交
164 165
  LOG(INFO) << "(logid=" << log_id << ") " << name() << "_time=["
            << op_time.u_elapsed() << "]";
W
wangguibao 已提交
166
  return ERR_OK;
W
wangguibao 已提交
167 168 169
}

std::string Op::time_info() {
W
wangguibao 已提交
170 171 172 173 174
  if (_timer) {
    return _timer->info();
  } else {
    return "Invalid Timer!";
  }
W
wangguibao 已提交
175 176 177
}

bool Op::is_mutable(const std::string& op) {
W
wangguibao 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }

  return true;
W
wangguibao 已提交
196 197 198
}

bool Op::is_mutable(const std::string& op) const {
W
wangguibao 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }

  return true;
W
wangguibao 已提交
217 218 219
}

bool Op::is_readable(const std::string& op) {
W
wangguibao 已提交
220
  if (op == START_OP_NAME) {
W
wangguibao 已提交
221
    return true;
W
wangguibao 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }

  return true;
W
wangguibao 已提交
238 239 240
}

bool Op::is_readable(const std::string& op) const {
W
wangguibao 已提交
241
  if (op == START_OP_NAME) {
W
wangguibao 已提交
242
    return true;
W
wangguibao 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
  }
  DagNode* node = const_cast<DagNode*>(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on "
                 << "op: " << op << "!";
    return false;
  }

  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }

  return true;
W
wangguibao 已提交
259 260
}

W
wangguibao 已提交
261
// Get the Channel object of dependent OP
W
wangguibao 已提交
262
Channel* Op::mutable_depend_channel(const std::string& op) {
W
wangguibao 已提交
263 264 265 266 267 268 269
  if (!is_mutable(op)) {
    LOG(WARNING) << "Op: " << _name << " cannot mutable op: " << op << "!";
    return NULL;
  }

  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
W
wangguibao 已提交
270 271
}

W
wangguibao 已提交
272
// Get the Channel object of dependent OP
W
wangguibao 已提交
273
const Channel* Op::get_depend_channel(const std::string& op) const {
W
wangguibao 已提交
274 275 276 277 278 279 280 281
  // Get the `mode` attribute of dependent OP from dag
  if (!is_readable(op)) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on op: " << op << "!";
    return NULL;
  }

  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
W
wangguibao 已提交
282 283 284
}

google::protobuf::Message* Op::mutable_message() {
W
wangguibao 已提交
285
  return mutable_channel()->message();
W
wangguibao 已提交
286 287 288
}

const google::protobuf::Message* Op::get_message() const {
W
wangguibao 已提交
289
  return get_channel()->message();
W
wangguibao 已提交
290 291 292 293
}

bool Op::has_calc() { return _has_calc; }

W
wangguibao 已提交
294
const char* Op::name() const { return _name.c_str(); }
W
wangguibao 已提交
295

W
wangguibao 已提交
296
const std::string& Op::type() const { return _type; }
W
wangguibao 已提交
297

W
wangguibao 已提交
298
uint32_t Op::id() const { return _id; }
W
wangguibao 已提交
299 300

const std::string Op::debug_string() {
W
wangguibao 已提交
301 302 303 304 305 306
  const Channel* channel = get_channel();
  if (!channel) {
    LOG(ERROR) << "Invalid channel!";
    return "Invalid channel in OP";
  }
  return channel->debug_string();
W
wangguibao 已提交
307 308 309
}

const google::protobuf::Message* Op::get_request_message() {
W
wangguibao 已提交
310
  return _bus->channel_by_name(START_OP_NAME)->message();
W
wangguibao 已提交
311 312
}

W
wangguibao 已提交
313 314 315
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu