// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/predictor/op/op.h"
#ifdef BCLOUD
#include   // base::Timer
#else
#include 
#endif
#include 
#include "core/predictor/common/constant.h"
#include "core/predictor/common/utils.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/dag.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int Op::init(Bus* bus,
             Dag* dag,
             uint32_t id,
             const std::string& name,
             const std::string& type,
             void* conf,
             const uint64_t log_id) {
  _bus = bus;
  _dag = dag;
  _id = id;
  _name = name;
  _type = type;
  set_config(conf);
  _timer = butil::get_object();
  if (!_timer) {
    LOG(ERROR) << "(logid=" << log_id
               << ") Invalid timerflow in op:" << this->name();
    return -1;
  }
  _timer->init();
  _has_calc = false;
  _has_init = true;
  Channel* channel = mutable_channel();
  if (channel == NULL) {
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed mutable channel in op: " << this->id() << ", "
               << this->name() << "!";
    return -1;
  }
  _pre_node_names.clear();
  return custom_init();
}
int Op::deinit() {
  if (_timer) {
    butil::return_object(_timer);
  }
  _bus = NULL;
  _dag = NULL;
  _timer = NULL;
  if (release_channel() != 0) {
    LOG(ERROR) << "Failed release channel in op:" << this->id() << ", "
               << this->name() << "!";
    return -1;
  }
  return custom_deinit();
}
int Op::check_time(const char* tag) {
  if (!_timer) {
    LOG(ERROR) << "Invalid timer in op";
    return -1;
  }
  if (!_timer->check(tag)) {
    LOG(ERROR) << "Failed check timer:" << tag;
    return -1;
  }
  return 0;
}
int Op::process(const uint64_t log_id, bool debug) {
  butil::Timer op_time(butil::Timer::STARTED);
  if (debug && _timer) {
    _timer->start();
  }
  if (!_has_init) {
    LOG(ERROR) << "(logid=" << log_id
               << ") Make sure op has been init before inference";
    return ERR_INTERNAL_FAILURE;
  }
  if (_has_calc) {
    LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
              << " already processed before";
    return ERR_OK;
  }
  // 1. dependency inference
  /*
  DagNode* node = _dag->node_by_name(this->name());
  if (node == NULL) {
      LOG(ERROR) << "Failed get node of op:" << this->name();
      return -1;
  }
  boost::unordered_map& depends =
      node->depends;
  boost::unordered_map::iterator it;
  for (it = depends.begin(); it != depends.end(); it++) {
      Op* depend_op = view->find(it->first);
      if (depend_op->process() != 0) {
          LOG(WARNING) << "Op: " << _name << " processed failed!";
          return -1;
      }
  }*/
  if (debug && _timer) {
    _timer->check("depend");
  }
  // 2. current inference
  if (inference() != 0) {
    return ERR_OP_INFER_FAILURE;
  }
  if (debug && _timer) {
    _timer->check("infer");
  }
  // 3. share output to bus
  Channel* channel = mutable_channel();
  channel->share_to_bus(_bus, log_id);
  // 4. mark has calculated
  _has_calc = true;
  if (debug && _timer) {
    _timer->check("share");
    _timer->end();
  }
  op_time.stop();
  PredictorMetric::GetInstance()->update_latency_metric(
      OP_METRIC_PREFIX + full_name(), op_time.u_elapsed());
  LOG(INFO) << "(logid=" << log_id << ") " << name() << "_time=["
            << op_time.u_elapsed() << "]";
  return ERR_OK;
}
std::string Op::time_info() {
  if (_timer) {
    return _timer->info();
  } else {
    return "Invalid Timer!";
  }
}
bool Op::is_mutable(const std::string& op) {
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }
  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }
  return true;
}
bool Op::is_mutable(const std::string& op) const {
  if (op == START_OP_NAME) {
    return false;
  }
  DagNode* node = const_cast(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }
  if (node->depends[op] != RW) {
    LOG(WARNING) << "op: " << _name << " has no RW access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please use get_argment() instead.";
    return false;
  }
  return true;
}
bool Op::is_readable(const std::string& op) {
  if (op == START_OP_NAME) {
    return true;
  }
  DagNode* node = const_cast(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on"
                 << "op: " << op << "!";
    return false;
  }
  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }
  return true;
}
bool Op::is_readable(const std::string& op) const {
  if (op == START_OP_NAME) {
    return true;
  }
  DagNode* node = const_cast(_dag->node_by_name(_name));
  if (node->depends.find(op) == node->depends.end()) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on "
                 << "op: " << op << "!";
    return false;
  }
  if (node->depends[op] != RW && node->depends[op] != RO) {
    LOG(WARNING) << "op: " << _name << " has no RO access"
                 << "ot op: " << op << ", mode: " << node->depends[op]
                 << ", please check your configuration.";
    return false;
  }
  return true;
}
// Get the Channel object of dependent OP
Channel* Op::mutable_depend_channel(const std::string& op) {
  if (!is_mutable(op)) {
    LOG(WARNING) << "Op: " << _name << " cannot mutable op: " << op << "!";
    return NULL;
  }
  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
}
// Get the Channel object of dependent OP
const Channel* Op::get_depend_channel(const std::string& op) const {
  // Get the `mode` attribute of dependent OP from dag
  if (!is_readable(op)) {
    LOG(WARNING) << "op: " << _name << " doesnot depend on op: " << op << "!";
    return NULL;
  }
  // Get the Channel object of dependent OP from bus
  return _bus->channel_by_name(op);
}
google::protobuf::Message* Op::mutable_message() {
  return mutable_channel()->message();
}
const google::protobuf::Message* Op::get_message() const {
  return get_channel()->message();
}
bool Op::has_calc() { return _has_calc; }
const char* Op::name() const { return _name.c_str(); }
const std::string& Op::type() const { return _type; }
uint32_t Op::id() const { return _id; }
const std::string Op::debug_string() {
  const Channel* channel = get_channel();
  if (!channel) {
    LOG(ERROR) << "Invalid channel!";
    return "Invalid channel in OP";
  }
  return channel->debug_string();
}
const google::protobuf::Message* Op::get_request_message() {
  return _bus->channel_by_name(START_OP_NAME)->message();
}
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu