server.cpp 5.3 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

G
guru4elephant 已提交
15
#include "core/predictor/framework/server.h"
W
wangguibao 已提交
16 17 18
#ifdef BCLOUD
#include <baidu/rpc/policy/nova_pbrpc_protocol.h>     // NovaServiceAdaptor
#include <baidu/rpc/policy/nshead_mcpack_protocol.h>  // NsheadMcpackAdaptor
19
#include <baidu/rpc/policy/public_pbrpc_protocol.h>  // PublicPbrpcServiceAdaptor
W
wangguibao 已提交
20
#else
W
wangguibao 已提交
21 22 23
#include <brpc/policy/nova_pbrpc_protocol.h>     // NovaServiceAdaptor
#include <brpc/policy/nshead_mcpack_protocol.h>  // NsheadMcpackAdaptor
#include <brpc/policy/public_pbrpc_protocol.h>   // PublicPbrpcServiceAdaptor
W
wangguibao 已提交
24
#endif
W
wangguibao 已提交
25 26
#include <string>
#include <utility>
G
guru4elephant 已提交
27 28 29 30
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/manager.h"
#include "core/predictor/framework/resource.h"
#include "core/predictor/framework/service_manager.h"
W
wangguibao 已提交
31 32 33 34 35

namespace baidu {
namespace paddle_serving {
namespace predictor {

W
wangguibao 已提交
36 37 38 39
#ifdef BCLOUD
namespace brpc = baidu::rpc;
#endif

W
wangguibao 已提交
40 41 42
volatile bool ServerManager::_s_reload_starting = true;

bool ServerManager::_compare_string_piece_without_case(
W
wangguibao 已提交
43 44 45 46 47
    const butil::StringPiece& s1, const char* s2) {
  if (strlen(s2) != s1.size()) {
    return false;
  }
  return strncasecmp(s1.data(), s2, s1.size()) == 0;
W
wangguibao 已提交
48 49 50
}

ServerManager::ServerManager() {
W
wangguibao 已提交
51 52 53 54
  _format_services.clear();
  _options.idle_timeout_sec = FLAGS_idle_timeout_s;
  _options.max_concurrency = FLAGS_max_concurrency;
  _options.num_threads = FLAGS_num_threads;
W
wangguibao 已提交
55 56 57
}

int ServerManager::add_service_by_format(const std::string& format) {
W
wangguibao 已提交
58
  Service* service = FormatServiceManager::instance().get_service(format);
W
wangguibao 已提交
59
  if (service == NULL) {
60
    LOG(ERROR) << "Not found service by format:" << format << "!";
W
wangguibao 已提交
61 62 63 64
    return -1;
  }

  if (_format_services.find(format) != _format_services.end()) {
W
wangguibao 已提交
65 66
    LOG(ERROR) << "Cannot insert duplicated service by format:" << format
               << "!";
W
wangguibao 已提交
67 68 69
    return -1;
  }

W
wangguibao 已提交
70 71
  std::pair<boost::unordered_map<std::string, Service*>::iterator, bool> it =
      _format_services.insert(std::make_pair(format, service));
W
wangguibao 已提交
72
  if (!it.second) {
W
wangguibao 已提交
73
    LOG(ERROR) << "Failed insert service by format:" << format << "!";
W
wangguibao 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86
    return -1;
  }

  return 0;
}

int ServerManager::start_and_wait() {
  if (_start_reloader() != 0) {
    LOG(ERROR) << "Failed start reloader";
    return -1;
  }

  boost::unordered_map<std::string, Service*>::iterator it;
W
wangguibao 已提交
87 88 89
  for (it = _format_services.begin(); it != _format_services.end(); it++) {
    if (_server.AddService(it->second, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
      LOG(ERROR) << "Failed to add service of format:" << it->first << "!";
W
wangguibao 已提交
90 91 92 93
      return -1;
    }
  }

94
  // rpc multi-thread start from here.
W
wangguibao 已提交
95
  if (_server.Start(FLAGS_port, &_options) != 0) {
W
wangguibao 已提交
96
    LOG(ERROR) << "Failed to start Paddle Inference Server";
W
wangguibao 已提交
97 98 99 100
    return -1;
  }
  _server.RunUntilAskedToQuit();

W
wangguibao 已提交
101
  ServerManager::stop_reloader();
W
wangguibao 已提交
102 103 104 105 106 107 108 109
  if (_wait_reloader() != 0) {
    LOG(ERROR) << "Failed start reloader";
    return -1;
  }
  return 0;
}

void ServerManager::_set_server_option_by_protocol(
W
wangguibao 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    const ::butil::StringPiece& protocol_type) {
  std::string enabled_protocols = FLAGS_enable_protocol_list;
  if (_compare_string_piece_without_case(protocol_type, "nova_pbrpc")) {
    _options.nshead_service = new ::brpc::policy::NovaServiceAdaptor;
  } else if (_compare_string_piece_without_case(protocol_type,
                                                "public_pbrpc")) {
    _options.nshead_service = new ::brpc::policy::PublicPbrpcServiceAdaptor;
  } else if (_compare_string_piece_without_case(protocol_type,
                                                "nshead_mcpack")) {
    _options.nshead_service = new ::brpc::policy::NsheadMcpackAdaptor;
  } else {
    LOG(ERROR) << "fail to set nshead protocol, protocol_type[" << protocol_type
               << "].";
    return;
  }
  _options.enabled_protocols = enabled_protocols;
  LOG(INFO) << "success to set nshead protocol, protocol_type[" << protocol_type
            << "].";
W
wangguibao 已提交
128 129 130
}

int ServerManager::_start_reloader() {
W
wangguibao 已提交
131 132
  int ret =
      THREAD_CREATE(&_reload_thread, NULL, ServerManager::_reload_worker, NULL);
W
wangguibao 已提交
133

W
wangguibao 已提交
134 135 136 137 138 139
  if (ret != 0) {
    LOG(ERROR) << "Failed start reload thread, ret:" << ret;
    return -1;
  }

  return 0;
W
wangguibao 已提交
140 141 142
}

int ServerManager::_wait_reloader() {
W
wangguibao 已提交
143 144
  THREAD_JOIN(_reload_thread, NULL);
  return 0;
W
wangguibao 已提交
145 146 147
}

void* ServerManager::_reload_worker(void* args) {
W
wangguibao 已提交
148 149 150 151 152 153 154 155 156 157
  LOG(INFO) << "Entrence reload worker, "
            << "interval_s: " << FLAGS_reload_interval_s;
  while (ServerManager::reload_starting()) {
    LOG(INFO) << "Begin reload framework...";
    if (Resource::instance().reload() != 0) {
      LOG(ERROR) << "Failed reload resource!";
    }

    if (WorkflowManager::instance().reload() != 0) {
      LOG(ERROR) << "Failed reload workflows";
W
wangguibao 已提交
158 159
    }

W
wangguibao 已提交
160 161 162 163 164
    usleep(FLAGS_reload_interval_s * 1000000);
  }

  LOG(INFO) << "Exit reload worker!";
  return NULL;
W
wangguibao 已提交
165 166
}

W
wangguibao 已提交
167 168 169
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu