// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/predictor/framework/service.h"
#ifdef BCLOUD
#include // butil::Timer
#else
#include // butil::Timer
#endif
#include
#include
#include
#include
#include "core/predictor/common/constant.h"
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/dag_view.h"
#include "core/predictor/framework/manager.h"
#include "core/predictor/framework/predictor_metric.h" // PredictorMetric
#include "core/predictor/framework/resource.h"
#include "core/predictor/framework/server.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
int InferService::init(const configure::InferService& conf) {
_infer_service_format = conf.name();
std::string merger = conf.merger();
if (merger == "") {
merger = "default";
}
if (!MergerManager::instance().get(merger, _merger)) {
LOG(ERROR) << "Failed get merger: " << merger;
return ERR_INTERNAL_FAILURE;
} else {
LOG(WARNING) << "Succ get merger: " << merger
<< " for service: " << _infer_service_format;
}
ServerManager& svr_mgr = ServerManager::instance();
if (svr_mgr.add_service_by_format(_infer_service_format) != 0) {
LOG(ERROR) << "Not found service by format name:" << _infer_service_format
<< "!";
return ERR_INTERNAL_FAILURE;
}
_enable_map_request_to_workflow = conf.enable_map_request_to_workflow();
LOG(INFO) << "service[" << _infer_service_format
<< "], enable_map_request_to_workflow["
<< _enable_map_request_to_workflow << "].";
if (_enable_map_request_to_workflow) {
if (_request_to_workflow_map.init(
MAX_WORKFLOW_NUM_IN_ONE_SERVICE /*load_factor=80*/) != 0) {
LOG(ERROR) << "init request to workflow map failed, bucket_count["
<< MAX_WORKFLOW_NUM_IN_ONE_SERVICE << "].";
return ERR_INTERNAL_FAILURE;
}
int err = 0;
_request_field_key = conf.request_field_key().c_str();
if (_request_field_key == "") {
LOG(ERROR) << "read request_field_key failed, request_field_key["
<< _request_field_key << "].";
return ERR_INTERNAL_FAILURE;
}
LOG(INFO) << "service[" << _infer_service_format << "], request_field_key["
<< _request_field_key << "].";
uint32_t value_mapped_workflows_size = conf.value_mapped_workflows_size();
for (uint32_t fi = 0; fi < value_mapped_workflows_size; fi++) {
std::vector tokens;
std::vector workflows;
std::string list = conf.value_mapped_workflows(fi).workflow();
boost::split(tokens, list, boost::is_any_of(","));
uint32_t tsize = tokens.size();
for (uint32_t ti = 0; ti < tsize; ++ti) {
boost::trim_if(tokens[ti], boost::is_any_of(" "));
Workflow* workflow = WorkflowManager::instance().item(tokens[ti]);
if (workflow == NULL) {
LOG(ERROR) << "Failed get workflow by name:" << tokens[ti]
<< ", ti: " << ti;
return ERR_INTERNAL_FAILURE;
}
workflow->regist_metric(full_name());
workflows.push_back(workflow);
}
const std::string& request_field_value =
conf.value_mapped_workflows(fi).request_field_value();
if (_request_to_workflow_map.insert(request_field_value, workflows) ==
NULL) {
LOG(ERROR) << "insert [" << request_field_value << "," << list
<< "] to _request_to_workflow_map failed.";
return ERR_INTERNAL_FAILURE;
}
LOG(INFO) << "workflow[" << list << "], request_field_value["
<< request_field_value << "].";
}
} else {
uint32_t flow_size = conf.workflows_size();
for (uint32_t fi = 0; fi < flow_size; fi++) {
const std::string& workflow_name = conf.workflows(fi);
Workflow* workflow = WorkflowManager::instance().item(workflow_name);
if (workflow == NULL) {
LOG(ERROR) << "Failed get workflow by name:" << workflow_name;
return ERR_INTERNAL_FAILURE;
}
workflow->regist_metric(full_name());
_flows.push_back(workflow);
}
}
LOG(INFO) << "Succ load infer_service: " << _infer_service_format << "!";
return ERR_OK;
}
int InferService::reload() { return ERR_OK; }
const std::string& InferService::name() const { return _infer_service_format; }
// ´®ÐÐÖ´ÐÐÿ¸öworkflow
int InferService::inference(const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
TRACEPRINTF("(logid=%" PRIu64 ") start to inference", log_id);
// when funtion call begins, framework will reset
// thread local variables&resources automatically.
if (Resource::instance().thread_clear() != 0) {
LOG(ERROR) << "(logid=" << log_id << ") Failed thread clear whole resource";
return ERR_INTERNAL_FAILURE;
}
TRACEPRINTF("(logid=%" PRIu64 ") finish to thread clear", log_id);
if (_enable_map_request_to_workflow) {
LOG(INFO) << "(logid=" << log_id << ") enable map request == True";
std::vector* workflows = _map_request_to_workflow(request);
if (!workflows || workflows->size() == 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed to map request to workflow";
return ERR_INTERNAL_FAILURE;
}
size_t fsize = workflows->size();
for (size_t fi = 0; fi < fsize; ++fi) {
Workflow* workflow = (*workflows)[fi];
if (workflow == NULL) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed to get valid workflow at: " << fi;
return ERR_INTERNAL_FAILURE;
}
TRACEPRINTF("(logid=%" PRIu64 ") start to execute workflow[%s]",
log_id,
workflow->name().c_str());
int errcode = _execute_workflow(workflow, request, response, debug_os);
TRACEPRINTF("(logid=%" PRIu64 ") finish to execute workflow[%s]",
log_id,
workflow->name().c_str());
if (errcode < 0) {
LOG(ERROR) << "(logid=" << log_id << ") Failed execute workflow["
<< workflow->name() << "] in:" << name();
return errcode;
}
}
} else {
LOG(INFO) << "(logid=" << log_id << ") enable map request == False";
TRACEPRINTF("(logid=%" PRIu64 ") start to execute one workflow", log_id);
size_t fsize = _flows.size();
for (size_t fi = 0; fi < fsize; ++fi) {
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute one workflow-%lu", log_id, fi);
int errcode = execute_one_workflow(fi, request, response, debug_os);
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute one workflow-%lu", log_id, fi);
if (errcode < 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed execute 0-th workflow in:" << name();
return errcode;
}
}
}
return ERR_OK;
}
int InferService::debug(const google::protobuf::Message* request,
google::protobuf::Message* response,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
return inference(request, response, log_id, debug_os);
}
int InferService::execute_one_workflow(uint32_t index,
const google::protobuf::Message* request,
google::protobuf::Message* response,
butil::IOBufBuilder* debug_os) {
if (index >= _flows.size()) {
LOG(ERROR) << "Faield execute workflow, index: " << index
<< " >= max:" << _flows.size();
return ERR_OVERFLOW_FAILURE;
}
Workflow* workflow = _flows[index];
return _execute_workflow(workflow, request, response, debug_os);
}
int InferService::_execute_workflow(Workflow* workflow,
const google::protobuf::Message* request,
google::protobuf::Message* response,
butil::IOBufBuilder* debug_os) {
butil::Timer workflow_time(butil::Timer::STARTED);
// create and submit beginer channel
BuiltinChannel req_channel;
req_channel.init(0, START_OP_NAME);
req_channel = request;
DagView* dv = workflow->fetch_dag_view(full_name());
dv->set_request_channel(req_channel);
// call actual inference interface
int errcode = dv->execute(debug_os);
if (errcode < 0) {
LOG(ERROR) << "Failed execute dag for workflow:" << workflow->name();
return errcode;
}
TRACEPRINTF("finish to dv execute");
// create ender channel and copy
const Channel* res_channel = dv->get_response_channel();
if (!_merger || !_merger->merge(res_channel->message(), response)) {
LOG(ERROR) << "Failed merge channel res to response";
return ERR_INTERNAL_FAILURE;
}
TRACEPRINTF("finish to copy from");
workflow_time.stop();
LOG(INFO) << "workflow total time: " << workflow_time.u_elapsed();
PredictorMetric::GetInstance()->update_latency_metric(
WORKFLOW_METRIC_PREFIX + dv->full_name(), workflow_time.u_elapsed());
// return tls data to object pool
workflow->return_dag_view(dv);
TRACEPRINTF("finish to return dag view");
return ERR_OK;
}
std::vector* InferService::_map_request_to_workflow(
const google::protobuf::Message* request) {
const google::protobuf::Descriptor* desc = request->GetDescriptor();
const google::protobuf::FieldDescriptor* field =
desc->FindFieldByName(_request_field_key);
if (field == NULL) {
LOG(ERROR) << "No field[" << _request_field_key << "] in ["
<< desc->full_name() << "].";
return NULL;
}
if (field->is_repeated()) {
LOG(ERROR) << "field[" << desc->full_name() << "." << _request_field_key
<< "] is repeated.";
return NULL;
}
if (field->cpp_type() != google::protobuf::FieldDescriptor::CPPTYPE_STRING) {
LOG(ERROR) << "field[" << desc->full_name() << "." << _request_field_key
<< "] should be string";
return NULL;
}
const std::string& field_value =
request->GetReflection()->GetString(*request, field);
std::vector* p_workflow =
_request_to_workflow_map.seek(field_value);
if (p_workflow == NULL) {
LOG(ERROR) << "cannot find key[" << field_value
<< "] in _request_to_workflow_map";
return NULL;
}
return p_workflow;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu