server.cc 10.3 KB
Newer Older
D
dinghao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/**
 * Copyright 2020 Huawei Technologies Co., Ltd
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "core/server.h"
H
hexia 已提交
17 18 19
#include <evhttp.h>
#include <event.h>
#include <event2/thread.h>
20
#include <event2/listener.h>
D
dinghao 已提交
21 22 23
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
H
hexia 已提交
24 25
#include <future>
#include <memory>
D
dinghao 已提交
26 27 28
#include <string>
#include <vector>
#include <utility>
29
#include "include/infer_log.h"
D
dinghao 已提交
30 31 32
#include "serving/ms_service.grpc.pb.h"
#include "core/util/option_parser.h"
#include "core/version_control/version_controller.h"
H
hexia 已提交
33
#include "core/session.h"
34
#include "core/serving_tensor.h"
H
hexia 已提交
35 36
#include "core/http_process.h"

D
dinghao 已提交
37 38 39 40 41 42
using ms_serving::MSService;
using ms_serving::PredictReply;
using ms_serving::PredictRequest;

namespace mindspore {
namespace serving {
43

D
dinghao 已提交
44
namespace {
D
dinghao 已提交
45 46
static const uint32_t uint32max = 0x7FFFFFFF;
std::promise<void> exit_requested;
X
xuyongfei 已提交
47
static const char kServerHttpIp[] = "0.0.0.0";
D
dinghao 已提交
48

D
dinghao 已提交
49
void ClearEnv() { Session::Instance().Clear(); }
D
dinghao 已提交
50
void HandleSignal(int sig) { exit_requested.set_value(); }
D
dinghao 已提交
51

52 53
grpc::Status CreatGRPCStatus(const Status &status) {
  switch (status.StatusCode()) {
D
dinghao 已提交
54 55 56 57
    case SUCCESS:
      return grpc::Status::OK;
    case FAILED:
      return grpc::Status::CANCELLED;
58 59 60 61 62 63 64
    case INVALID_INPUTS: {
      auto status_msg = status.StatusMessage();
      if (status_msg.empty()) {
        status_msg = "The Predict Inputs do not match the Model Request!";
      }
      return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, status_msg);
    }
D
dinghao 已提交
65 66 67 68 69
    default:
      return grpc::Status::CANCELLED;
  }
}

D
dinghao 已提交
70 71 72 73 74 75
}  // namespace

// Service Implement
class MSServiceImpl final : public MSService::Service {
  grpc::Status Predict(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
    std::lock_guard<std::mutex> lock(mutex_);
76 77 78
    MSI_TIME_STAMP_START(Predict)
    auto res = Session::Instance().Predict(*request, *reply);
    MSI_TIME_STAMP_END(Predict)
D
dinghao 已提交
79 80
    if (res != inference::SUCCESS) {
      return CreatGRPCStatus(res);
D
dinghao 已提交
81
    }
82
    MSI_LOG(INFO) << "Finish call service Eval";
D
dinghao 已提交
83 84 85 86
    return grpc::Status::OK;
  }

  grpc::Status Test(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
87
    MSI_LOG(INFO) << "TestService call";
D
dinghao 已提交
88 89 90 91 92
    return grpc::Status::OK;
  }
  std::mutex mutex_;
};

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
static std::pair<struct evhttp *, struct event_base *> NewHttpServer() {
  auto option_args = Options::Instance().GetArgs();
  int32_t http_port = option_args->rest_api_port;
  // init http server
  event_init();
  evthread_use_pthreads();
  struct event_base *eb = event_base_new();
  if (eb == nullptr) {
    MSI_LOG(ERROR) << "Serving Error: RESTful server start failed, new http event failed";
    std::cout << "Serving Error: RESTful server start failed, new http event failed" << std::endl;
    return std::make_pair(nullptr, nullptr);
  }
  struct evhttp *http_server = evhttp_new(eb);
  if (http_server == nullptr) {
    MSI_LOG(ERROR) << "Serving Error: RESTful server start failed, create http server faild";
    std::cout << "Serving Error: RESTful server start failed, create http server faild" << std::endl;
    event_base_free(eb);
    return std::make_pair(nullptr, nullptr);
  }

  struct sockaddr_in sin = {};
  sin.sin_family = AF_INET;
  sin.sin_port = htons(http_port);
  auto listener =
    evconnlistener_new_bind(eb, nullptr, nullptr, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_EXEC | LEV_OPT_CLOSE_ON_FREE, -1,
                            reinterpret_cast<struct sockaddr *>(&sin), sizeof(sin));

  if (listener == nullptr) {
    MSI_LOG_ERROR << "Serving Error: RESTful server start failed, create http listener faild, port " << http_port;
    std::cout << "Serving Error: RESTful server start failed, create http listener faild, port " << http_port
              << std::endl;
    event_base_free(eb);
    evhttp_free(http_server);
    return std::make_pair(nullptr, nullptr);
  }
  auto bound = evhttp_bind_listener(http_server, listener);
  if (bound == nullptr) {
    MSI_LOG_ERROR << "Serving Error: RESTful server start failed, bind http listener to server faild, port "
                  << http_port;
    std::cout << "Serving Error: RESTful server start failed, bind http listener  to server faild, port " << http_port
              << std::endl;
    evconnlistener_free(listener);
    event_base_free(eb);
    evhttp_free(http_server);
    return std::make_pair(nullptr, nullptr);
  }
  return std::make_pair(http_server, eb);
}

X
xuyongfei 已提交
142
Status BuildAndStartModelInner() {
D
dinghao 已提交
143 144 145 146 147 148 149 150
  Status res;
  auto option_args = Options::Instance().GetArgs();
  std::string model_path = option_args->model_path;
  std::string model_name = option_args->model_name;
  std::string device_type = option_args->device_type;
  auto device_id = option_args->device_id;
  res = Session::Instance().CreatDeviceSession(device_type, device_id);
  if (res != SUCCESS) {
151 152 153 154
    MSI_LOG(ERROR) << "Serving Error: create inference session failed, device type  " << device_type << " device id "
                   << device_id;
    std::cout << "Serving Error: create inference session failed, device type  " << device_type << " device id "
              << device_id << std::endl;
D
dinghao 已提交
155 156 157 158 159
    return res;
  }
  VersionController version_controller(option_args->poll_model_wait_seconds, model_path, model_name);
  res = version_controller.Run();
  if (res != SUCCESS) {
160 161 162 163
    MSI_LOG(ERROR) << "Serving Error: load model failed, model directory " << option_args->model_path << " model name "
                   << option_args->model_name;
    std::cout << "Serving Error: load model failed, model directory " << option_args->model_path << " model name "
              << option_args->model_name << std::endl;
X
xuyongfei 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    return res;
  }
  return SUCCESS;
}

Status BuildAndStartModel() {
  try {
    auto status = BuildAndStartModelInner();
    return status;
  } catch (const std::bad_alloc &ex) {
    MSI_LOG(ERROR) << "Serving Error: malloc memory failed";
    std::cout << "Serving Error: malloc memory failed" << std::endl;
  } catch (const std::runtime_error &ex) {
    MSI_LOG(ERROR) << "Serving Error: runtime error occurred: " << ex.what();
    std::cout << "Serving Error: runtime error occurred: " << ex.what() << std::endl;
  } catch (const std::exception &ex) {
    MSI_LOG(ERROR) << "Serving Error: exception occurred: " << ex.what();
    std::cout << "Serving Error: exception occurred: " << ex.what() << std::endl;
  } catch (...) {
    MSI_LOG(ERROR) << "Serving Error: exception occurred";
    std::cout << "Serving Error: exception occurred";
  }
  return FAILED;
}

Status Server::BuildAndStart() {
  // handle exit signal
  signal(SIGINT, HandleSignal);
  signal(SIGTERM, HandleSignal);
  Status res = BuildAndStartModel();
  if (res != SUCCESS) {
D
dinghao 已提交
195 196 197
    ClearEnv();
    return res;
  }
X
xuyongfei 已提交
198
  auto option_args = Options::Instance().GetArgs();
X
xuyongfei 已提交
199
  std::string server_address = std::string(kServerHttpIp) + ":" + std::to_string(option_args->grpc_port);
X
xuyongfei 已提交
200

201 202 203 204 205 206 207 208 209 210 211 212 213
  auto http_server_new_ret = NewHttpServer();
  struct evhttp *http_server = http_server_new_ret.first;
  struct event_base *eb = http_server_new_ret.second;
  if (http_server == nullptr || eb == nullptr) {
    MSI_LOG(ERROR) << "Serving Error: RESTful server start failed";
    std::cout << "Serving Error: RESTful server start failed" << std::endl;
    ClearEnv();
    return FAILED;
  }
  auto exit_http = [eb, http_server]() {
    evhttp_free(http_server);
    event_base_free(eb);
  };
H
hexia 已提交
214
  int32_t http_port = option_args->rest_api_port;
X
xuyongfei 已提交
215
  std::string http_addr = kServerHttpIp;
216

X
xuyongfei 已提交
217
  evhttp_set_timeout(http_server, 60);
218
  evhttp_set_gencb(http_server, http_handler_msg, nullptr);
H
hexia 已提交
219 220

  // grpc server
D
dinghao 已提交
221
  MSServiceImpl ms_service;
D
dinghao 已提交
222 223 224 225
  grpc::EnableDefaultHealthCheckService(true);
  grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  // Set the port is not reuseable
  auto option = grpc::MakeChannelArgumentOption(GRPC_ARG_ALLOW_REUSEPORT, 0);
D
dinghao 已提交
226 227 228 229
  grpc::ServerBuilder serverBuilder;
  serverBuilder.SetOption(std::move(option));
  serverBuilder.SetMaxMessageSize(uint32max);
  serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
D
dinghao 已提交
230
  serverBuilder.RegisterService(&ms_service);
D
dinghao 已提交
231
  std::unique_ptr<grpc::Server> server(serverBuilder.BuildAndStart());
D
dinghao 已提交
232
  if (server == nullptr) {
233 234 235 236 237 238 239 240
    MSI_LOG(ERROR) << "Serving Error: create server failed, gRPC address " << server_address << ", RESTful address "
                   << http_addr << ":" << http_port << ", model directory " << option_args->model_path << " model name "
                   << option_args->model_name << ", device type " << option_args->device_type << ", device id "
                   << option_args->device_id;
    std::cout << "Serving Error: create server failed, gRPC address " << server_address << ", RESTful address "
              << http_addr << ":" << http_port << ", model directory " << option_args->model_path << " model name "
              << option_args->model_name << ", device type " << option_args->device_type << ", device id "
              << option_args->device_id << std::endl;
D
dinghao 已提交
241
    ClearEnv();
242
    exit_http();
D
dinghao 已提交
243 244
    return FAILED;
  }
H
hexia 已提交
245 246
  auto grpc_server_run = [&server, &server_address]() {
    MSI_LOG(INFO) << "MS Serving grpc listening on " << server_address;
247
    std::cout << "Serving: MS Serving gRPC start success, listening on " << server_address << std::endl;
H
hexia 已提交
248 249 250 251
    server->Wait();
  };
  auto http_server_run = [&eb, &http_addr, &http_port]() {
    MSI_LOG(INFO) << "MS Serving restful listening on " << http_addr << ":" << http_port;
252 253
    std::cout << "Serving: MS Serving RESTful start success, listening on " << http_addr << ":" << http_port
              << std::endl;
H
hexia 已提交
254 255 256 257
    event_base_dispatch(eb);
  };
  std::thread grpc_thread(grpc_server_run);
  std::thread restful_thread(http_server_run);
D
dinghao 已提交
258 259 260 261
  auto exit_future = exit_requested.get_future();
  exit_future.wait();
  ClearEnv();
  server->Shutdown();
262 263
  event_base_loopexit(eb, nullptr);
  exit_http();
H
hexia 已提交
264 265
  grpc_thread.join();
  restful_thread.join();
D
dinghao 已提交
266 267 268 269
  return SUCCESS;
}
}  // namespace serving
}  // namespace mindspore