server.cpp 4.3 KB
Newer Older
W
wangguibao 已提交
1 2 3
#include <brpc/policy/nova_pbrpc_protocol.h> // NovaServiceAdaptor
#include <brpc/policy/public_pbrpc_protocol.h> // PublicPbrpcServiceAdaptor
#include <brpc/policy/nshead_mcpack_protocol.h> // NsheadMcpackAdaptor
W
wangguibao 已提交
4 5 6 7 8 9 10 11 12 13 14 15 16
#include "common/inner_common.h"
#include "framework/server.h"
#include "framework/service_manager.h"
#include "framework/resource.h"
#include "framework/manager.h"

namespace baidu {
namespace paddle_serving {
namespace predictor {

volatile bool ServerManager::_s_reload_starting = true;

bool ServerManager::_compare_string_piece_without_case(
W
wangguibao 已提交
17
        const butil::StringPiece& s1, const char* s2) {
W
wangguibao 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
    if (strlen(s2) != s1.size()) {
        return false;
    }
    return strncasecmp(s1.data(), s2, s1.size()) == 0;
}

ServerManager::ServerManager() {
    _format_services.clear();
    _options.idle_timeout_sec = FLAGS_idle_timeout_s;
    _options.max_concurrency = FLAGS_max_concurrency;
    _options.num_threads = FLAGS_num_threads;
}

int ServerManager::add_service_by_format(const std::string& format) {
  Service* service = 
      FormatServiceManager::instance().get_service(format);
  if (service == NULL) {
35
    LOG(ERROR) << "Not found service by format:" << format << "!";
W
wangguibao 已提交
36 37 38 39
    return -1;
  }

  if (_format_services.find(format) != _format_services.end()) {
40
    LOG(ERROR) << "Cannot insert duplicated service by format:" 
W
wangguibao 已提交
41 42 43 44 45 46 47
        << format << "!";
    return -1;
  }

  std::pair<boost::unordered_map<std::string, Service*>::iterator, bool> it 
      = _format_services.insert(std::make_pair(format, service));
  if (!it.second) {
48
    LOG(ERROR) << "Failed insert service by format:"
W
wangguibao 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
        << format << "!";
    return -1;
  }

  return 0;
}

int ServerManager::start_and_wait() {
  if (_start_reloader() != 0) {
    LOG(ERROR) << "Failed start reloader";
    return -1;
  }

  boost::unordered_map<std::string, Service*>::iterator it;
  for (it = _format_services.begin(); it != _format_services.end();
          it++) {
W
wangguibao 已提交
65
    if (_server.AddService(it->second, brpc::SERVER_DOESNT_OWN_SERVICE)
W
wangguibao 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78
            != 0) {
      LOG(ERROR) << "Failed to add service of format:"
          << it->first << "!";
      return -1;
    }
  }

  if (_server.Start(FLAGS_port, &_options) != 0) {
    LOG(ERROR) << "Failed to start Paddle Inference Server" ;
    return -1;
  }
  _server.RunUntilAskedToQuit();

W
wangguibao 已提交
79
  ServerManager::stop_reloader();
W
wangguibao 已提交
80 81 82 83 84 85 86 87
  if (_wait_reloader() != 0) {
    LOG(ERROR) << "Failed start reloader";
    return -1;
  }
  return 0;
}

void ServerManager::_set_server_option_by_protocol(
W
wangguibao 已提交
88
        const ::butil::StringPiece& protocol_type) {
W
wangguibao 已提交
89
    std::string enabled_protocols = FLAGS_enable_protocol_list;
W
wangguibao 已提交
90 91
    if (_compare_string_piece_without_case(protocol_type, "nova_pbrpc")) {
        _options.nshead_service = new ::brpc::policy::NovaServiceAdaptor;;
W
wangguibao 已提交
92
    } else if (_compare_string_piece_without_case(protocol_type, "public_pbrpc")) {
W
wangguibao 已提交
93
        _options.nshead_service = new ::brpc::policy::PublicPbrpcServiceAdaptor;
W
wangguibao 已提交
94
    } else if (_compare_string_piece_without_case(protocol_type, "nshead_mcpack")) {
W
wangguibao 已提交
95
        _options.nshead_service = new ::brpc::policy::NsheadMcpackAdaptor;
W
wangguibao 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
    } else {
        LOG(ERROR) << "fail to set nshead protocol, protocol_type[" << protocol_type << "].";
        return;
    }
    _options.enabled_protocols = enabled_protocols;
    LOG(INFO) << "success to set nshead protocol, protocol_type[" << protocol_type << "].";
}

int ServerManager::_start_reloader() {
    int ret = THREAD_CREATE(
            &_reload_thread, NULL,
            ServerManager::_reload_worker,
            NULL);

    if (ret != 0) {
        LOG(ERROR) << "Failed start reload thread, ret:" << ret; 
        return -1;
    }

    return 0;
}

int ServerManager::_wait_reloader() {
    THREAD_JOIN(_reload_thread, NULL);
    return 0;
}

void* ServerManager::_reload_worker(void* args) {
W
wangguibao 已提交
124
    LOG(INFO) << "Entrence reload worker, "
W
wangguibao 已提交
125 126
        << "interval_s: " << FLAGS_reload_interval_s;
    while (ServerManager::reload_starting()) {
W
wangguibao 已提交
127
        LOG(INFO) << "Begin reload framework...";
W
wangguibao 已提交
128
        if (Resource::instance().reload() != 0) {
129
            LOG(ERROR) << "Failed reload resource!";
W
wangguibao 已提交
130 131 132
        }   

        if (WorkflowManager::instance().reload() != 0) {
133
            LOG(ERROR) << "Failed reload workflows"; 
W
wangguibao 已提交
134 135 136 137 138
        }

        usleep(FLAGS_reload_interval_s * 1000000);
    }

W
wangguibao 已提交
139
    LOG(INFO) << "Exit reload worker!";
W
wangguibao 已提交
140 141 142 143 144 145
    return NULL;
}

} // predictor
} // paddle_serving
} // baidu