service.cpp 11.7 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
guru4elephant 已提交
15
#include "core/predictor/framework/service.h"
W
wangguibao 已提交
16 17 18
#ifdef BCLOUD
#include <base/time.h>  // butil::Timer
#else
W
wangguibao 已提交
19
#include <butil/time.h>  // butil::Timer
W
wangguibao 已提交
20 21
#endif

22
#include <inttypes.h>
W
wangguibao 已提交
23 24 25
#include <list>
#include <string>
#include <vector>
G
guru4elephant 已提交
26 27 28 29 30 31 32 33
#include "core/predictor/common/constant.h"
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/dag_view.h"
#include "core/predictor/framework/manager.h"
#include "core/predictor/framework/predictor_metric.h"  // PredictorMetric
#include "core/predictor/framework/resource.h"
#include "core/predictor/framework/server.h"
W
wangguibao 已提交
34 35 36 37 38

namespace baidu {
namespace paddle_serving {
namespace predictor {

W
wangguibao 已提交
39
int InferService::init(const configure::InferService& conf) {
W
wangguibao 已提交
40
  _infer_service_format = conf.name();
W
wangguibao 已提交
41

W
wangguibao 已提交
42 43 44 45 46 47 48 49 50 51 52
  std::string merger = conf.merger();
  if (merger == "") {
    merger = "default";
  }
  if (!MergerManager::instance().get(merger, _merger)) {
    LOG(ERROR) << "Failed get merger: " << merger;
    return ERR_INTERNAL_FAILURE;
  } else {
    LOG(WARNING) << "Succ get merger: " << merger
                 << " for service: " << _infer_service_format;
  }
W
wangguibao 已提交
53

W
wangguibao 已提交
54 55 56 57 58 59 60 61 62
  ServerManager& svr_mgr = ServerManager::instance();
  if (svr_mgr.add_service_by_format(_infer_service_format) != 0) {
    LOG(ERROR) << "Not found service by format name:" << _infer_service_format
               << "!";
    return ERR_INTERNAL_FAILURE;
  }

  _enable_map_request_to_workflow = conf.enable_map_request_to_workflow();
  LOG(INFO) << "service[" << _infer_service_format
W
wangguibao 已提交
63 64 65
            << "], enable_map_request_to_workflow["
            << _enable_map_request_to_workflow << "].";

W
wangguibao 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79
  if (_enable_map_request_to_workflow) {
    if (_request_to_workflow_map.init(
            MAX_WORKFLOW_NUM_IN_ONE_SERVICE /*load_factor=80*/) != 0) {
      LOG(ERROR) << "init request to workflow map failed, bucket_count["
                 << MAX_WORKFLOW_NUM_IN_ONE_SERVICE << "].";
      return ERR_INTERNAL_FAILURE;
    }
    int err = 0;
    _request_field_key = conf.request_field_key().c_str();
    if (_request_field_key == "") {
      LOG(ERROR) << "read request_field_key failed, request_field_key["
                 << _request_field_key << "].";
      return ERR_INTERNAL_FAILURE;
    }
W
wangguibao 已提交
80

W
wangguibao 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
    LOG(INFO) << "service[" << _infer_service_format << "], request_field_key["
              << _request_field_key << "].";
    uint32_t value_mapped_workflows_size = conf.value_mapped_workflows_size();
    for (uint32_t fi = 0; fi < value_mapped_workflows_size; fi++) {
      std::vector<std::string> tokens;
      std::vector<Workflow*> workflows;
      std::string list = conf.value_mapped_workflows(fi).workflow();
      boost::split(tokens, list, boost::is_any_of(","));
      uint32_t tsize = tokens.size();
      for (uint32_t ti = 0; ti < tsize; ++ti) {
        boost::trim_if(tokens[ti], boost::is_any_of(" "));
        Workflow* workflow = WorkflowManager::instance().item(tokens[ti]);
        if (workflow == NULL) {
          LOG(ERROR) << "Failed get workflow by name:" << tokens[ti]
                     << ", ti: " << ti;
          return ERR_INTERNAL_FAILURE;
W
wangguibao 已提交
97
        }
W
wangguibao 已提交
98 99 100
        workflow->regist_metric(full_name());
        workflows.push_back(workflow);
      }
W
wangguibao 已提交
101

W
wangguibao 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
      const std::string& request_field_value =
          conf.value_mapped_workflows(fi).request_field_value();
      if (_request_to_workflow_map.insert(request_field_value, workflows) ==
          NULL) {
        LOG(ERROR) << "insert [" << request_field_value << "," << list
                   << "] to _request_to_workflow_map failed.";
        return ERR_INTERNAL_FAILURE;
      }
      LOG(INFO) << "workflow[" << list << "], request_field_value["
                << request_field_value << "].";
    }
  } else {
    uint32_t flow_size = conf.workflows_size();
    for (uint32_t fi = 0; fi < flow_size; fi++) {
      const std::string& workflow_name = conf.workflows(fi);
      Workflow* workflow = WorkflowManager::instance().item(workflow_name);
      if (workflow == NULL) {
        LOG(ERROR) << "Failed get workflow by name:" << workflow_name;
        return ERR_INTERNAL_FAILURE;
      }
      workflow->regist_metric(full_name());
      _flows.push_back(workflow);
    }
  }
W
wangguibao 已提交
126

W
wangguibao 已提交
127
  LOG(INFO) << "Succ load infer_service: " << _infer_service_format << "!";
W
wangguibao 已提交
128

W
wangguibao 已提交
129
  return ERR_OK;
W
wangguibao 已提交
130 131
}

W
wangguibao 已提交
132
int InferService::reload() { return ERR_OK; }
W
wangguibao 已提交
133

W
wangguibao 已提交
134
const std::string& InferService::name() const { return _infer_service_format; }
W
wangguibao 已提交
135

W
wangguibao 已提交
136 137 138
// ´®ÐÐÖ´ÐÐÿ¸öworkflow
int InferService::inference(const google::protobuf::Message* request,
                            google::protobuf::Message* response,
139
                            const uint64_t log_id,
W
wangguibao 已提交
140
                            butil::IOBufBuilder* debug_os) {
141
  TRACEPRINTF("(logid=%" PRIu64 ") start to inference", log_id);
W
wangguibao 已提交
142 143 144
  // when funtion call begins, framework will reset
  // thread local variables&resources automatically.
  if (Resource::instance().thread_clear() != 0) {
145
    LOG(ERROR) << "(logid=" << log_id << ") Failed thread clear whole resource";
W
wangguibao 已提交
146 147
    return ERR_INTERNAL_FAILURE;
  }
W
wangguibao 已提交
148

149
  TRACEPRINTF("(logid=%" PRIu64 ") finish to thread clear", log_id);
W
wangguibao 已提交
150

W
wangguibao 已提交
151
  if (_enable_map_request_to_workflow) {
B
barriery 已提交
152 153 154
    VLOG(2) << "(logid=" << log_id << ") enable map request == True";
    std::vector<Workflow*>* workflows =
        _map_request_to_workflow(request, log_id);
W
wangguibao 已提交
155
    if (!workflows || workflows->size() == 0) {
156 157
      LOG(ERROR) << "(logid=" << log_id
                 << ") Failed to map request to workflow";
W
wangguibao 已提交
158
      return ERR_INTERNAL_FAILURE;
W
wangguibao 已提交
159
    }
W
wangguibao 已提交
160 161 162 163
    size_t fsize = workflows->size();
    for (size_t fi = 0; fi < fsize; ++fi) {
      Workflow* workflow = (*workflows)[fi];
      if (workflow == NULL) {
164 165
        LOG(ERROR) << "(logid=" << log_id
                   << ") Failed to get valid workflow at: " << fi;
W
wangguibao 已提交
166 167
        return ERR_INTERNAL_FAILURE;
      }
168 169 170
      TRACEPRINTF("(logid=%" PRIu64 ") start to execute workflow[%s]",
                  log_id,
                  workflow->name().c_str());
B
barriery 已提交
171 172
      int errcode =
          _execute_workflow(workflow, request, response, log_id, debug_os);
173 174 175
      TRACEPRINTF("(logid=%" PRIu64 ") finish to execute workflow[%s]",
                  log_id,
                  workflow->name().c_str());
W
wangguibao 已提交
176
      if (errcode < 0) {
177 178
        LOG(ERROR) << "(logid=" << log_id << ") Failed execute workflow["
                   << workflow->name() << "] in:" << name();
W
wangguibao 已提交
179 180 181 182
        return errcode;
      }
    }
  } else {
B
barriery 已提交
183
    VLOG(2) << "(logid=" << log_id << ") enable map request == False";
184
    TRACEPRINTF("(logid=%" PRIu64 ") start to execute one workflow", log_id);
W
wangguibao 已提交
185 186
    size_t fsize = _flows.size();
    for (size_t fi = 0; fi < fsize; ++fi) {
187 188
      TRACEPRINTF(
          "(logid=%" PRIu64 ") start to execute one workflow-%lu", log_id, fi);
B
barriery 已提交
189 190
      int errcode =
          execute_one_workflow(fi, request, response, log_id, debug_os);
191 192
      TRACEPRINTF(
          "(logid=%" PRIu64 ") finish to execute one workflow-%lu", log_id, fi);
W
wangguibao 已提交
193
      if (errcode < 0) {
194 195
        LOG(ERROR) << "(logid=" << log_id
                   << ") Failed execute 0-th workflow in:" << name();
W
wangguibao 已提交
196 197 198 199 200
        return errcode;
      }
    }
  }
  return ERR_OK;
W
wangguibao 已提交
201 202
}

W
wangguibao 已提交
203 204
int InferService::debug(const google::protobuf::Message* request,
                        google::protobuf::Message* response,
205
                        const uint64_t log_id,
W
wangguibao 已提交
206
                        butil::IOBufBuilder* debug_os) {
207
  return inference(request, response, log_id, debug_os);
W
wangguibao 已提交
208 209
}

W
wangguibao 已提交
210 211 212
int InferService::execute_one_workflow(uint32_t index,
                                       const google::protobuf::Message* request,
                                       google::protobuf::Message* response,
B
barriery 已提交
213
                                       const uint64_t log_id,
W
wangguibao 已提交
214 215
                                       butil::IOBufBuilder* debug_os) {
  if (index >= _flows.size()) {
B
barriery 已提交
216 217
    LOG(ERROR) << "(logid=" << log_id
               << ") Faield execute workflow, index: " << index
W
wangguibao 已提交
218 219 220 221
               << " >= max:" << _flows.size();
    return ERR_OVERFLOW_FAILURE;
  }
  Workflow* workflow = _flows[index];
B
barriery 已提交
222
  return _execute_workflow(workflow, request, response, log_id, debug_os);
W
wangguibao 已提交
223 224
}

W
wangguibao 已提交
225 226 227
int InferService::_execute_workflow(Workflow* workflow,
                                    const google::protobuf::Message* request,
                                    google::protobuf::Message* response,
B
barriery 已提交
228
                                    const uint64_t log_id,
W
wangguibao 已提交
229 230 231 232 233 234
                                    butil::IOBufBuilder* debug_os) {
  butil::Timer workflow_time(butil::Timer::STARTED);
  // create and submit beginer channel
  BuiltinChannel req_channel;
  req_channel.init(0, START_OP_NAME);
  req_channel = request;
W
wangguibao 已提交
235

B
barriery 已提交
236 237
  DagView* dv = workflow->fetch_dag_view(full_name(), log_id);
  dv->set_request_channel(req_channel, log_id);
W
wangguibao 已提交
238

W
wangguibao 已提交
239
  // call actual inference interface
B
barriery 已提交
240
  int errcode = dv->execute(log_id, debug_os);
W
wangguibao 已提交
241
  if (errcode < 0) {
B
barriery 已提交
242 243
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed execute dag for workflow:" << workflow->name();
W
wangguibao 已提交
244 245
    return errcode;
  }
W
wangguibao 已提交
246

B
barriery 已提交
247
  TRACEPRINTF("(logid=%" PRIu64 ") finish to dv execute", log_id);
W
wangguibao 已提交
248
  // create ender channel and copy
B
barriery 已提交
249 250 251 252 253 254
  const Channel* res_channel = dv->get_response_channel(log_id);
  if (res_channel == NULL) {
    LOG(ERROR) << "(logid=" << log_id << ") Failed get response channel";
    return ERR_INTERNAL_FAILURE;
  }

W
wangguibao 已提交
255
  if (!_merger || !_merger->merge(res_channel->message(), response)) {
B
barriery 已提交
256 257
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed merge channel res to response";
W
wangguibao 已提交
258 259
    return ERR_INTERNAL_FAILURE;
  }
B
barriery 已提交
260
  TRACEPRINTF("(logid=%" PRIu64 ") finish to copy from", log_id);
W
wangguibao 已提交
261

W
wangguibao 已提交
262
  workflow_time.stop();
B
barriery 已提交
263 264
  LOG(INFO) << "(logid=" << log_id
            << ") workflow total time: " << workflow_time.u_elapsed();
W
wangguibao 已提交
265 266 267 268 269
  PredictorMetric::GetInstance()->update_latency_metric(
      WORKFLOW_METRIC_PREFIX + dv->full_name(), workflow_time.u_elapsed());

  // return tls data to object pool
  workflow->return_dag_view(dv);
B
barriery 已提交
270
  TRACEPRINTF("(logid=%" PRIu64 ") finish to return dag view", log_id);
W
wangguibao 已提交
271
  return ERR_OK;
W
wangguibao 已提交
272 273 274
}

std::vector<Workflow*>* InferService::_map_request_to_workflow(
B
barriery 已提交
275
    const google::protobuf::Message* request, const uint64_t log_id) {
W
wangguibao 已提交
276 277 278 279
  const google::protobuf::Descriptor* desc = request->GetDescriptor();
  const google::protobuf::FieldDescriptor* field =
      desc->FindFieldByName(_request_field_key);
  if (field == NULL) {
B
barriery 已提交
280 281
    LOG(ERROR) << "(logid=" << log_id << ") No field[" << _request_field_key
               << "] in [" << desc->full_name() << "].";
W
wangguibao 已提交
282 283 284
    return NULL;
  }
  if (field->is_repeated()) {
B
barriery 已提交
285 286
    LOG(ERROR) << "(logid=" << log_id << ") field[" << desc->full_name() << "."
               << _request_field_key << "] is repeated.";
W
wangguibao 已提交
287 288 289
    return NULL;
  }
  if (field->cpp_type() != google::protobuf::FieldDescriptor::CPPTYPE_STRING) {
B
barriery 已提交
290 291
    LOG(ERROR) << "(logid=" << log_id << ") field[" << desc->full_name() << "."
               << _request_field_key << "] should be string";
W
wangguibao 已提交
292 293 294 295 296 297 298
    return NULL;
  }
  const std::string& field_value =
      request->GetReflection()->GetString(*request, field);
  std::vector<Workflow*>* p_workflow =
      _request_to_workflow_map.seek(field_value);
  if (p_workflow == NULL) {
B
barriery 已提交
299
    LOG(ERROR) << "(logid=" << log_id << ") cannot find key[" << field_value
W
wangguibao 已提交
300 301 302 303
               << "] in _request_to_workflow_map";
    return NULL;
  }
  return p_workflow;
W
wangguibao 已提交
304 305
}

W
wangguibao 已提交
306 307 308
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu