From 7e8ba8cc07151d62c516ccc47231fdc71a28139a Mon Sep 17 00:00:00 2001 From: xuyongfei <xuyongfei.huawei.com> Date: Wed, 26 Aug 2020 16:35:32 +0800 Subject: [PATCH] serving RESTful: opt for performance --- include/infer_log.h | 14 ++ serving/core/http_process.cc | 320 ++++++++++++++++++++++------------- serving/core/server.cc | 2 +- 3 files changed, 216 insertions(+), 120 deletions(-) diff --git a/include/infer_log.h b/include/infer_log.h index f08fefde6..4a5bf5e1f 100644 --- a/include/infer_log.h +++ b/include/infer_log.h @@ -24,6 +24,7 @@ #include <memory> #include <iostream> #include <chrono> +#include <vector> #ifndef ENABLE_ACL #include "mindspore/core/utils/log_adapter.h" @@ -44,6 +45,19 @@ class LogStream { return *this; } + template <typename T> + LogStream &operator<<(const std::vector<T> &val) noexcept { + (*sstream_) << "["; + for (size_t i = 0; i < val.size(); i++) { + (*this) << val[i]; + if (i + 1 < val.size()) { + (*sstream_) << ", "; + } + } + (*sstream_) << "]"; + return *this; + } + LogStream &operator<<(std::ostream &func(std::ostream &os)) noexcept { (*sstream_) << func; return *this; diff --git a/serving/core/http_process.cc b/serving/core/http_process.cc index 7ed6a6676..290359e7a 100644 --- a/serving/core/http_process.cc +++ b/serving/core/http_process.cc @@ -21,6 +21,7 @@ #include "util/status.h" #include "core/session.h" #include "core/http_process.h" +#include "core/serving_tensor.h" using ms_serving::MSService; using ms_serving::PredictReply; @@ -35,10 +36,9 @@ static constexpr char HTTP_DATA[] = "data"; static constexpr char HTTP_TENSOR[] = "tensor"; enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR }; enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT }; -static const std::map<HTTP_DATA_TYPE, ms_serving::DataType> http_to_infer_map{ - {HTTP_DATA_NONE, ms_serving::MS_UNKNOWN}, - {HTTP_DATA_INT, ms_serving::MS_INT32}, - {HTTP_DATA_FLOAT, ms_serving::MS_FLOAT32}}; + +static const std::map<inference::DataType, HTTP_DATA_TYPE> infer_type2_http_type{ + {inference::DataType::kMSI_Int32, HTTP_DATA_INT}, {inference::DataType::kMSI_Float32, HTTP_DATA_FLOAT}}; Status GetPostMessage(struct evhttp_request *req, std::string *buf) { Status status(SUCCESS); @@ -93,69 +93,96 @@ Status CheckMessageValid(const json &message_info, HTTP_TYPE *type) { return status; } -Status GetDataFromJson(const json &json_data, std::string *data, HTTP_DATA_TYPE *type) { +Status GetDataFromJson(const json &json_data_array, ServingTensor *request_tensor, size_t data_index, + HTTP_DATA_TYPE type) { Status status(SUCCESS); - if (json_data.is_number_integer()) { - if (*type == HTTP_DATA_NONE) { - *type = HTTP_DATA_INT; - } else if (*type != HTTP_DATA_INT) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent"); - return status; + auto type_name = [](const json &json_data) -> std::string { + if (json_data.is_number_integer()) { + return "integer"; + } else if (json_data.is_number_float()) { + return "float"; } - auto s_data = json_data.get<int32_t>(); - data->append(reinterpret_cast<char *>(&s_data), sizeof(int32_t)); - } else if (json_data.is_number_float()) { - if (*type == HTTP_DATA_NONE) { - *type = HTTP_DATA_FLOAT; - } else if (*type != HTTP_DATA_FLOAT) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be consistent"); - return status; + return json_data.type_name(); + }; + size_t array_size = json_data_array.size(); + if (type == HTTP_DATA_INT) { + auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data()) + data_index; + for (size_t k = 0; k < array_size; k++) { + auto &json_data = json_data_array[k]; + if (!json_data.is_number_integer()) { + status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(json_data); + MSI_LOG_ERROR << status.StatusMessage(); + return status; + } + data[k] = json_data.get<int32_t>(); + } + } else if (type == HTTP_DATA_FLOAT) { + auto data = reinterpret_cast<float *>(request_tensor->mutable_data()) + data_index; + for (size_t k = 0; k < array_size; k++) { + auto &json_data = json_data_array[k]; + if (!json_data.is_number_float()) { + status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(json_data); + MSI_LOG_ERROR << status.StatusMessage(); + return status; + } + data[k] = json_data.get<float>(); } - auto s_data = json_data.get<float>(); - data->append(reinterpret_cast<char *>(&s_data), sizeof(float)); - } else { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input data type should be int or float"); - return status; } return SUCCESS; } -Status RecusiveGetTensor(const json &json_data, size_t depth, std::vector<int> *shape, std::string *data, - HTTP_DATA_TYPE *type) { +Status RecusiveGetTensor(const json &json_data, size_t depth, ServingTensor *request_tensor, size_t data_index, + HTTP_DATA_TYPE type) { Status status(SUCCESS); - if (depth >= 10) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor shape dims is larger than 10"); + std::vector<int64_t> required_shape = request_tensor->shape(); + if (depth >= required_shape.size()) { + status = INFER_STATUS(INVALID_INPUTS) + << "input tensor shape dims is more than required dims " << required_shape.size(); + MSI_LOG_ERROR << status.StatusMessage(); return status; } if (!json_data.is_array()) { ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally"); return status; } - int cur_dim = json_data.size(); - if (shape->size() <= depth) { - shape->push_back(cur_dim); - } else if ((*shape)[depth] != cur_dim) { - return INFER_STATUS(INVALID_INPUTS) << "the tensor shape is constructed illegally"; + if (json_data.size() != static_cast<size_t>(required_shape[depth])) { + status = INFER_STATUS(INVALID_INPUTS) + << "tensor format request is constructed illegally, input tensor shape dim " << depth + << " not match, required " << required_shape[depth] << ", given " << json_data.size(); + MSI_LOG_ERROR << status.StatusMessage(); + return status; } - if (json_data.at(0).is_array()) { - for (const auto &item : json_data) { - status = RecusiveGetTensor(item, depth + 1, shape, data, type); + if (depth + 1 < required_shape.size()) { + size_t sub_element_cnt = + std::accumulate(required_shape.begin() + depth + 1, required_shape.end(), 1LL, std::multiplies<size_t>()); + for (size_t k = 0; k < json_data.size(); k++) { + status = RecusiveGetTensor(json_data[k], depth + 1, request_tensor, data_index + sub_element_cnt * k, type); if (status != SUCCESS) { return status; } } } else { - // last dim, read the data - for (auto item : json_data) { - status = GetDataFromJson(item, data, type); - if (status != SUCCESS) { - return status; - } + status = GetDataFromJson(json_data, request_tensor, data_index, type); + if (status != SUCCESS) { + return status; } } return status; } +std::vector<int64_t> GetJsonArrayShape(const json &json_array) { + std::vector<int64_t> json_shape; + const json *tmp_json = &json_array; + while (tmp_json->is_array()) { + if (tmp_json->empty()) { + break; + } + json_shape.push_back(tmp_json->size()); + tmp_json = &tmp_json->at(0); + } + return json_shape; +} + Status TransDataToPredictRequest(const json &message_info, PredictRequest *request) { Status status = SUCCESS; auto tensors = message_info.find(HTTP_DATA); @@ -163,52 +190,50 @@ Status TransDataToPredictRequest(const json &message_info, PredictRequest *reque ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type"); return status; } - - if (tensors->size() == 0) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is null"); + if (!tensors->is_array()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array"); return status; } - for (const auto &tensor : *tensors) { - std::string msg_data; - HTTP_DATA_TYPE type{HTTP_DATA_NONE}; + auto const &json_shape = GetJsonArrayShape(*tensors); + if (json_shape.size() != 2) { // 2 is data format list deep + status = INFER_STATUS(INVALID_INPUTS) + << "the data format request is constructed illegally, expected list nesting depth 2, given " + << json_shape.size(); + MSI_LOG_ERROR << status.StatusMessage(); + return status; + } + if (tensors->size() != static_cast<size_t>(request->data_size())) { + status = INFER_STATUS(INVALID_INPUTS) + << "model input count not match, model required " << request->data_size() << ", given " << tensors->size(); + MSI_LOG_ERROR << status.StatusMessage(); + return status; + } + for (size_t i = 0; i < tensors->size(); i++) { + const auto &tensor = tensors->at(i); + ServingTensor request_tensor(*(request->mutable_data(i))); + auto iter = infer_type2_http_type.find(request_tensor.data_type()); + if (iter == infer_type2_http_type.end()) { + ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now"); + return status; + } + HTTP_DATA_TYPE type = iter->second; if (!tensor.is_array()) { ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally"); return status; } - if (tensor.size() == 0) { + if (tensor.empty()) { ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null"); return status; } - for (const auto &tensor_data : tensor) { - status = GetDataFromJson(tensor_data, &msg_data, &type); - if (status != SUCCESS) { - return status; - } - } - auto iter = http_to_infer_map.find(type); - if (iter == http_to_infer_map.end()) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now"); + if (tensor.size() != static_cast<size_t>(request_tensor.ElementNum())) { + status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " element count not match, model required " + << request_tensor.ElementNum() << ", given " << tensor.size(); + MSI_LOG_ERROR << status.StatusMessage(); return status; } - - auto infer_tensor = request->add_data(); - infer_tensor->set_tensor_type(iter->second); - infer_tensor->set_data(msg_data.data(), msg_data.size()); - } - // get model required shape - std::vector<inference::InferTensor> tensor_list; - status = Session::Instance().GetModelInputsInfo(tensor_list); - if (status != SUCCESS) { - ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed"); - return status; - } - if (request->data_size() != static_cast<int64_t>(tensor_list.size())) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the inputs number is not equal to model required"); - return status; - } - for (int i = 0; i < request->data_size(); i++) { - for (size_t j = 0; j < tensor_list[i].shape().size(); ++j) { - request->mutable_data(i)->mutable_tensor_shape()->add_dims(tensor_list[i].shape()[j]); + status = GetDataFromJson(tensor, &request_tensor, 0, type); + if (status != SUCCESS) { + return status; } } return SUCCESS; @@ -221,22 +246,44 @@ Status TransTensorToPredictRequest(const json &message_info, PredictRequest *req ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type"); return status; } + if (!tensors->is_array()) { + ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array"); + return status; + } + if (tensors->size() != static_cast<size_t>(request->data_size())) { + status = + INFER_STATUS(INVALID_INPUTS) + << "model input count not match or json tensor request is constructed illegally, model input count required " + << request->data_size() << ", given " << tensors->size(); + MSI_LOG_ERROR << status.StatusMessage(); + return status; + } + + for (size_t i = 0; i < tensors->size(); i++) { + const auto &tensor = tensors->at(i); + ServingTensor request_tensor(*(request->mutable_data(i))); - for (const auto &tensor : *tensors) { - std::vector<int> shape; - std::string msg_data; - HTTP_DATA_TYPE type{HTTP_DATA_NONE}; - RecusiveGetTensor(tensor, 0, &shape, &msg_data, &type); - auto iter = http_to_infer_map.find(type); - if (iter == http_to_infer_map.end()) { - ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input type is not supported right now"); + // check data shape + auto const &json_shape = GetJsonArrayShape(tensor); + if (json_shape != request_tensor.shape()) { // data shape not match + status = INFER_STATUS(INVALID_INPUTS) + << "input " << i << " shape is invalid, expected " << request_tensor.shape() << ", given " << json_shape; + MSI_LOG_ERROR << status.StatusMessage(); return status; } - auto infer_tensor = request->add_data(); - infer_tensor->set_tensor_type(iter->second); - infer_tensor->set_data(msg_data.data(), msg_data.size()); - for (const auto dim : shape) { - infer_tensor->mutable_tensor_shape()->add_dims(dim); + + auto iter = infer_type2_http_type.find(request_tensor.data_type()); + if (iter == infer_type2_http_type.end()) { + ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now"); + return status; + } + HTTP_DATA_TYPE type = iter->second; + size_t depth = 0; + size_t data_index = 0; + status = RecusiveGetTensor(tensor, depth, &request_tensor, data_index, type); + if (status != SUCCESS) { + MSI_LOG_ERROR << "Transfer tensor to predict request failed"; + return status; } } return status; @@ -253,6 +300,27 @@ Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, Predict return status; } + // get model required shape + std::vector<inference::InferTensor> tensor_list; + status = Session::Instance().GetModelInputsInfo(tensor_list); + if (status != SUCCESS) { + ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed"); + return status; + } + for (auto &item : tensor_list) { + auto input = request->add_data(); + ServingTensor tensor(*input); + tensor.set_shape(item.shape()); + tensor.set_data_type(item.data_type()); + int64_t element_num = tensor.ElementNum(); + int64_t data_type_size = tensor.GetTypeSize(tensor.data_type()); + if (element_num <= 0 || INT64_MAX / element_num < data_type_size) { + ERROR_INFER_STATUS(status, FAILED, "model shape invalid"); + return status; + } + tensor.resize_data(element_num * data_type_size); + } + MSI_TIME_STAMP_START(ParseJson) json message_info; try { message_info = nlohmann::json::parse(post_message); @@ -262,6 +330,7 @@ Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, Predict ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message); return status; } + MSI_TIME_STAMP_END(ParseJson) status = CheckMessageValid(message_info, type); if (status != SUCCESS) { @@ -285,24 +354,18 @@ Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, js Status status(SUCCESS); switch (tensor.tensor_type()) { case ms_serving::MS_INT32: { - std::vector<int> result_tensor; - for (int j = 0; j < len; j++) { - int val; - memcpy(&val, reinterpret_cast<const int *>(tensor.data().data()) + *pos + j, sizeof(int)); - result_tensor.push_back(val); - } - *out_json = result_tensor; + auto data = reinterpret_cast<const int *>(tensor.data().data()) + *pos; + std::vector<int32_t> result_tensor(len); + memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(int32_t), data, len * sizeof(int32_t)); + *out_json = std::move(result_tensor); *pos += len; break; } case ms_serving::MS_FLOAT32: { - std::vector<float> result_tensor; - for (int j = 0; j < len; j++) { - float val; - memcpy(&val, reinterpret_cast<const float *>(tensor.data().data()) + *pos + j, sizeof(float)); - result_tensor.push_back(val); - } - *out_json = result_tensor; + auto data = reinterpret_cast<const float *>(tensor.data().data()) + *pos; + std::vector<float> result_tensor(len); + memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(float), data, len * sizeof(float)); + *out_json = std::move(result_tensor); *pos += len; break; } @@ -316,7 +379,8 @@ Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, js Status TransPredictReplyToData(const PredictReply &reply, json *out_json) { Status status(SUCCESS); for (int i = 0; i < reply.result_size(); i++) { - json tensor_json; + (*out_json)["data"].push_back(json()); + json &tensor_json = (*out_json)["data"].back(); int num = 1; for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) { num *= reply.result(i).tensor_shape().dims(j); @@ -326,7 +390,6 @@ Status TransPredictReplyToData(const PredictReply &reply, json *out_json) { if (status != SUCCESS) { return status; } - (*out_json)["data"].push_back(tensor_json); } return status; } @@ -344,12 +407,12 @@ Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, js } } else { for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) { - json tensor_json; + out_json->push_back(json()); + json &tensor_json = out_json->back(); status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json); if (status != SUCCESS) { return status; } - out_json->push_back(tensor_json); } } return status; @@ -358,13 +421,13 @@ Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, js Status TransPredictReplyToTensor(const PredictReply &reply, json *out_json) { Status status(SUCCESS); for (int i = 0; i < reply.result_size(); i++) { - json tensor_json; + (*out_json)["tensor"].push_back(json()); + json &tensor_json = (*out_json)["tensor"].back(); int pos = 0; status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json); if (status != SUCCESS) { return status; } - (*out_json)["tensor"].push_back(tensor_json); } return status; } @@ -384,38 +447,57 @@ Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &ty return status; } - std::string out_str = out_json.dump(); + const std::string &out_str = out_json.dump(); evbuffer_add(buf, out_str.data(), out_str.size()); return status; } -void http_handler_msg(struct evhttp_request *req, void *arg) { - std::cout << "in handle" << std::endl; +Status HttpHandleMsgDetail(struct evhttp_request *req, void *arg, struct evbuffer *retbuff) { PredictRequest request; PredictReply reply; HTTP_TYPE type; + MSI_TIME_STAMP_START(ParseRequest) auto status = TransHTTPMsgToPredictRequest(req, &request, &type); + MSI_TIME_STAMP_END(ParseRequest) if (status != SUCCESS) { - ErrorMessage(req, status); MSI_LOG(ERROR) << "restful trans to request failed"; - return; + return status; } MSI_TIME_STAMP_START(Predict) status = Session::Instance().Predict(request, reply); + MSI_TIME_STAMP_END(Predict) if (status != SUCCESS) { - ErrorMessage(req, status); MSI_LOG(ERROR) << "restful predict failed"; + return status; } - MSI_TIME_STAMP_END(Predict) - struct evbuffer *retbuff = evbuffer_new(); + MSI_TIME_STAMP_START(CreateReplyJson) status = TransPredictReplyToHTTPMsg(reply, type, retbuff); + MSI_TIME_STAMP_END(CreateReplyJson) if (status != SUCCESS) { - ErrorMessage(req, status); MSI_LOG(ERROR) << "restful trans to reply failed"; + return status; + } + return SUCCESS; +} + +void http_handler_msg(struct evhttp_request *req, void *arg) { + MSI_TIME_STAMP_START(TotalRestfulPredict) + struct evbuffer *retbuff = evbuffer_new(); + if (retbuff == nullptr) { + MSI_LOG_ERROR << "Create event buffer failed"; + return; + } + auto status = HttpHandleMsgDetail(req, arg, retbuff); + if (status != SUCCESS) { + ErrorMessage(req, status); + evbuffer_free(retbuff); return; } + MSI_TIME_STAMP_START(ReplyJson) evhttp_send_reply(req, HTTP_OK, "Client", retbuff); + MSI_TIME_STAMP_END(ReplyJson) evbuffer_free(retbuff); + MSI_TIME_STAMP_END(TotalRestfulPredict) } } // namespace serving diff --git a/serving/core/server.cc b/serving/core/server.cc index 238ab7352..57842c8cc 100644 --- a/serving/core/server.cc +++ b/serving/core/server.cc @@ -185,7 +185,7 @@ Status Server::BuildAndStart() { int32_t http_port = option_args->rest_api_port; std::string http_addr = "0.0.0.0"; - evhttp_set_timeout(http_server, 5); + evhttp_set_timeout(http_server, 60); evhttp_set_gencb(http_server, http_handler_msg, nullptr); // grpc server -- GitLab