未验证 提交 1668872c 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge branch 'develop' into shenzhen_intl

......@@ -22,11 +22,8 @@ message EngineDesc {
required string reloadable_type = 4;
required string model_dir = 5;
repeated int32 gpu_ids = 6;
required int32 runtime_thread_num = 7;
required int32 batch_infer_size = 8;
required int32 enable_batch_align = 9;
optional string version_file = 10;
optional string version_type = 11;
optional string version_file = 7;
optional string version_type = 8;
/*
* Sparse Parameter Service type. Valid types are:
......@@ -39,17 +36,34 @@ message EngineDesc {
LOCAL = 1;
REMOTE = 2;
}
optional SparseParamServiceType sparse_param_service_type = 12;
optional string sparse_param_service_table_name = 13;
optional bool enable_memory_optimization = 14;
optional bool enable_ir_optimization = 15;
optional bool use_trt = 16;
optional bool use_lite = 17;
optional bool use_xpu = 18;
optional bool use_gpu = 19;
optional bool combined_model = 20;
optional bool encrypted_model = 21;
optional bool gpu_multi_stream = 22;
optional SparseParamServiceType sparse_param_service_type = 10;
optional string sparse_param_service_table_name = 11;
optional bool enable_memory_optimization = 12;
optional bool enable_ir_optimization = 13;
optional bool use_trt = 14;
optional bool use_lite = 15;
optional bool use_xpu = 16;
optional bool use_gpu = 17;
optional bool combined_model = 18;
optional bool encrypted_model = 19;
optional bool gpu_multi_stream = 20;
/*
* "runtime_thread_num": n == 0 means don`t use Asynchronous task scheduling
* mode.
* n > 0 means how many Predictor for this engine in Asynchronous task
* scheduling mode.
* "batch_infer_size": the max batch for this engine in Asynchronous task
* scheduling mode.
* "enable_overrun": always put a whole task into the TaskQueue even if the
* total batch is bigger than "batch_infer_size".
* "allow_split_request": allow to split task(which is corresponding to
* request).
*/
optional int32 runtime_thread_num = 30 [ default = 0 ];
optional int32 batch_infer_size = 31 [ default = 32 ];
optional bool enable_overrun = 32 [ default = false ];
optional bool allow_split_request = 33 [ default = true ];
};
// model_toolkit conf
......
......@@ -3,3 +3,24 @@ add_subdirectory(pybind11)
pybind11_add_module(serving_client src/general_model.cpp src/pybind_general_model.cpp)
target_link_libraries(serving_client PRIVATE -Wl,--whole-archive utils sdk-cpp pybind python -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl -lz -Wl,-rpath,'$ORIGIN'/lib)
endif()
if(CLIENT)
FILE(GLOB client_srcs include/*.h src/client.cpp src/brpc_client.cpp)
add_library(client ${client_srcs})
add_dependencies(client utils sdk-cpp)
target_link_libraries(client utils sdk-cpp)
endif()
if(CLIENT)
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../../)
add_executable(simple_client example/simple_client.cpp)
add_dependencies(simple_client utils sdk-cpp client)
target_link_libraries(simple_client -Wl,--whole-archive
-Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl -lz -Wl,-rpath,'$ORIGIN'/lib)
target_link_libraries(simple_client utils)
target_link_libraries(simple_client sdk-cpp)
target_link_libraries(simple_client client)
endif()
\ No newline at end of file
# 用于Paddle Serving的C++客户端
(简体中文|[English](./README.md))
## 请求BRPC-Server
### 服务端启动
以fit_a_line模型为例,服务端启动与常规BRPC-Server端启动命令一样。
```
cd ../../python/examples/fit_a_line
sh get_data.sh
python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393
```
### 客户端预测
客户端目前支持BRPC
目前已经实现了BRPC的封装函数,详见[brpc_client.cpp](./src/brpc_client.cpp)
```
./simple_client --client_conf="uci_housing_client/serving_client_conf.prototxt" --server_port="127.0.0.1:9393" --test_type="brpc" --sample_type="fit_a_line"
```
更多示例详见[simple_client.cpp](./example/simple_client.cpp)
| Argument | Type | Default | Description |
| ---------------------------------------------- | ---- | ------------------------------------ | ----------------------------------------------------- |
| `client_conf` | str | `"serving_client_conf.prototxt"` | Path of client conf |
| `server_port` | str | `"127.0.0.1:9393"` | Exposed ip:port of server |
| `test_type` | str | `"brpc"` | Mode of request "brpc" |
| `sample_type` | str | `"fit_a_line"` | Type of sample include "fit_a_line,bert" |
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <fstream>
#include <vector>
#include "core/general-client/include/brpc_client.h"
using baidu::paddle_serving::client::ServingClient;
using baidu::paddle_serving::client::ServingBrpcClient;
using baidu::paddle_serving::client::PredictorInputs;
using baidu::paddle_serving::client::PredictorOutputs;
DEFINE_string(server_port, "127.0.0.1:9292", "ip:port");
DEFINE_string(client_conf, "serving_client_conf.prototxt", "Path of client conf");
DEFINE_string(test_type, "brpc", "brpc");
// fit_a_line, bert
DEFINE_string(sample_type, "fit_a_line", "List: fit_a_line, bert");
namespace {
int prepare_fit_a_line(PredictorInputs& input, std::vector<std::string>& fetch_name) {
std::vector<float> float_feed = {0.0137f, -0.1136f, 0.2553f, -0.0692f,
0.0582f, -0.0727f, -0.1583f, -0.0584f,
0.6283f, 0.4919f, 0.1856f, 0.0795f, -0.0332f};
std::vector<int> float_shape = {1, 13};
std::string feed_name = "x";
fetch_name = {"price"};
std::vector<int> lod;
input.add_float_data(float_feed, feed_name, float_shape, lod);
return 0;
}
int prepare_bert(PredictorInputs& input, std::vector<std::string>& fetch_name) {
{
std::vector<float> float_feed(128, 0.0f);
float_feed[0] = 1.0f;
std::vector<int> float_shape = {1, 128, 1};
std::string feed_name = "input_mask";
std::vector<int> lod;
input.add_float_data(float_feed, feed_name, float_shape, lod);
}
{
std::vector<int64_t> feed(128, 0);
std::vector<int> shape = {1, 128, 1};
std::string feed_name = "position_ids";
std::vector<int> lod;
input.add_int64_data(feed, feed_name, shape, lod);
}
{
std::vector<int64_t> feed(128, 0);
feed[0] = 101;
std::vector<int> shape = {1, 128, 1};
std::string feed_name = "input_ids";
std::vector<int> lod;
input.add_int64_data(feed, feed_name, shape, lod);
}
{
std::vector<int64_t> feed(128, 0);
std::vector<int> shape = {1, 128, 1};
std::string feed_name = "segment_ids";
std::vector<int> lod;
input.add_int64_data(feed, feed_name, shape, lod);
}
fetch_name = {"pooled_output"};
return 0;
}
} // namespace
int main(int argc, char* argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
std::string url = FLAGS_server_port;
std::string conf = FLAGS_client_conf;
std::string test_type = FLAGS_test_type;
std::string sample_type = FLAGS_sample_type;
LOG(INFO) << "url = " << url << ";"
<< "client_conf = " << conf << ";"
<< "test_type = " << test_type
<< "sample_type = " << sample_type;
std::unique_ptr<ServingClient> client;
// default type is brpc
// will add grpc&http in the future
if (test_type == "brpc") {
client.reset(new ServingBrpcClient());
} else {
client.reset(new ServingBrpcClient());
}
std::vector<std::string> confs;
confs.push_back(conf);
if (client->init(confs, url) != 0) {
LOG(ERROR) << "Failed to init client!";
return 0;
}
PredictorInputs input;
PredictorOutputs output;
std::vector<std::string> fetch_name;
if (sample_type == "fit_a_line") {
prepare_fit_a_line(input, fetch_name);
}
else if (sample_type == "bert") {
prepare_bert(input, fetch_name);
}
else {
prepare_fit_a_line(input, fetch_name);
}
if (client->predict(input, output, fetch_name, 0) != 0) {
LOG(ERROR) << "Failed to predict!";
}
else {
LOG(INFO) << output.print();
}
return 0;
}
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "core/general-client/include/client.h"
#include "core/sdk-cpp/include/predictor_sdk.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
namespace baidu {
namespace paddle_serving {
namespace client {
class ServingBrpcClient : public ServingClient {
public:
ServingBrpcClient() {};
~ServingBrpcClient() {};
virtual int connect(const std::string server_port);
int predict(const PredictorInputs& inputs,
PredictorOutputs& outputs,
const std::vector<std::string>& fetch_name,
const uint64_t log_id);
private:
// generate default SDKConf
std::string gen_desc(const std::string server_port);
private:
PredictorApi _api;
Predictor* _predictor;
};
} // namespace client
} // namespace paddle_serving
} // namespace baidu
\ No newline at end of file
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include <map>
#include <sstream>
#include <memory>
namespace baidu {
namespace paddle_serving {
namespace predictor {
namespace general_model {
class Request;
class Response;
}
}
namespace client {
class PredictorInputs;
class PredictorOutputs;
class ServingClient {
public:
ServingClient() {};
virtual ~ServingClient() = default;
int init(const std::vector<std::string>& client_conf,
const std::string server_port);
int load_client_config(const std::vector<std::string>& client_conf);
virtual int connect(const std::string server_port) = 0;
virtual int predict(const PredictorInputs& inputs,
PredictorOutputs& outputs,
const std::vector<std::string>& fetch_name,
const uint64_t log_id) = 0;
protected:
std::map<std::string, int> _feed_name_to_idx;
std::vector<std::string> _feed_name;
std::map<std::string, int> _fetch_name_to_idx;
std::map<std::string, std::string> _fetch_name_to_var_name;
std::map<std::string, int> _fetch_name_to_type;
std::vector<std::vector<int>> _shape;
std::vector<int> _type;
std::vector<int64_t> _last_request_ts;
};
class PredictorData {
public:
PredictorData() {};
virtual ~PredictorData() {};
void add_float_data(const std::vector<float>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype = 1);
void add_int64_data(const std::vector<int64_t>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype = 0);
void add_int32_data(const std::vector<int32_t>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype = 2);
void add_string_data(const std::string& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype = 3);
const std::map<std::string, std::vector<float>>& float_data_map() const {
return _float_data_map;
};
std::map<std::string, std::vector<float>>* mutable_float_data_map() {
return &_float_data_map;
};
const std::map<std::string, std::vector<int64_t>>& int64_data_map() const {
return _int64_data_map;
};
std::map<std::string, std::vector<int64_t>>* mutable_int64_data_map() {
return &_int64_data_map;
};
const std::map<std::string, std::vector<int32_t>>& int_data_map() const {
return _int32_data_map;
};
std::map<std::string, std::vector<int32_t>>* mutable_int_data_map() {
return &_int32_data_map;
};
const std::map<std::string, std::string>& string_data_map() const {
return _string_data_map;
};
std::map<std::string, std::string>* mutable_string_data_map() {
return &_string_data_map;
};
const std::map<std::string, std::vector<int>>& shape_map() const {
return _shape_map;
};
std::map<std::string, std::vector<int>>* mutable_shape_map() {
return &_shape_map;
};
const std::map<std::string, std::vector<int>>& lod_map() const {
return _lod_map;
};
std::map<std::string, std::vector<int>>* mutable_lod_map() {
return &_lod_map;
};
int get_datatype(std::string name) const;
std::string print();
private:
// used to print vector data map e.g. _float_data_map
template<typename T1, typename T2>
std::string map2string(const std::map<T1, std::vector<T2>>& map) {
std::ostringstream oss;
oss.str("");
oss.precision(6);
oss.setf(std::ios::fixed);
std::string key_seg = ":";
std::string val_seg = ",";
std::string end_seg = "\n";
typename std::map<T1, std::vector<T2>>::const_iterator it = map.begin();
typename std::map<T1, std::vector<T2>>::const_iterator itEnd = map.end();
for (; it != itEnd; it++) {
oss << "{";
oss << it->first << key_seg;
const std::vector<T2>& v = it->second;
for (size_t i = 0; i < v.size(); ++i) {
if (i != v.size() - 1) {
oss << v[i] << val_seg;
}
else {
oss << v[i];
}
}
oss << "}";
}
return oss.str();
};
// used to print data map without vector e.g. _string_data_map
template<typename T1, typename T2>
std::string map2string(const std::map<T1, T2>& map) {
std::ostringstream oss;
oss.str("");
std::string key_seg = ":";
std::string val_seg = ",";
std::string end_seg = "\n";
typename std::map<T1, T2>::const_iterator it = map.begin();
typename std::map<T1, T2>::const_iterator itEnd = map.end();
for (; it != itEnd; it++) {
oss << "{";
oss << it->first << key_seg << it->second;
oss << "}";
}
return oss.str();
};
protected:
std::map<std::string, std::vector<float>> _float_data_map;
std::map<std::string, std::vector<int64_t>> _int64_data_map;
std::map<std::string, std::vector<int32_t>> _int32_data_map;
std::map<std::string, std::string> _string_data_map;
std::map<std::string, std::vector<int>> _shape_map;
std::map<std::string, std::vector<int>> _lod_map;
std::map<std::string, int> _datatype_map;
};
class PredictorInputs : public PredictorData {
public:
PredictorInputs() {};
virtual ~PredictorInputs() {};
// generate proto from inputs
// feed_name_to_idx: mapping alias name to idx
// feed_name: mapping idx to name
static int GenProto(const PredictorInputs& inputs,
const std::map<std::string, int>& feed_name_to_idx,
const std::vector<std::string>& feed_name,
predictor::general_model::Request& req);
};
class PredictorOutputs {
public:
struct PredictorOutput {
std::string engine_name;
PredictorData data;
};
PredictorOutputs() {};
virtual ~PredictorOutputs() {};
const std::vector<std::shared_ptr<PredictorOutputs::PredictorOutput>>& datas() {
return _datas;
};
std::vector<std::shared_ptr<PredictorOutputs::PredictorOutput>>* mutable_datas() {
return &_datas;
};
void add_data(const std::shared_ptr<PredictorOutputs::PredictorOutput>& data) {
_datas.push_back(data);
};
std::string print();
void clear();
// Parse proto to outputs
// fetch_name: name of data to be output
// fetch_name_to_type: mapping of fetch_name to datatype
static int ParseProto(const predictor::general_model::Response& res,
const std::vector<std::string>& fetch_name,
std::map<std::string, int>& fetch_name_to_type,
PredictorOutputs& outputs);
protected:
std::vector<std::shared_ptr<PredictorOutputs::PredictorOutput>> _datas;
};
} // namespace client
} // namespace paddle_serving
} // namespace baidu
\ No newline at end of file
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/general-client/include/brpc_client.h"
#include "core/sdk-cpp/include/common.h"
#include "core/util/include/timer.h"
#include "core/sdk-cpp/builtin_format.pb.h"
#include "core/sdk-cpp/general_model_service.pb.h"
DEFINE_bool(profile_client, false, "");
DEFINE_bool(profile_server, false, "");
#define BRPC_MAX_BODY_SIZE 512 * 1024 * 1024
namespace baidu {
namespace paddle_serving {
namespace client {
using baidu::paddle_serving::Timer;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Tensor;
using configure::SDKConf;
using configure::VariantConf;
using configure::Predictor;
using configure::VariantConf;
int ServingBrpcClient::connect(const std::string server_port) {
brpc::fLU64::FLAGS_max_body_size = BRPC_MAX_BODY_SIZE;
if (_api.create(gen_desc(server_port)) != 0) {
LOG(ERROR) << "Predictor Creation Failed";
return -1;
}
// _api.thrd_initialize();
return 0;
}
std::string ServingBrpcClient::gen_desc(const std::string server_port) {
// default config for brpc
SDKConf sdk_conf;
Predictor* predictor = sdk_conf.add_predictors();
predictor->set_name("general_model");
predictor->set_service_name("baidu.paddle_serving.predictor.general_model.GeneralModelService");
predictor->set_endpoint_router("WeightedRandomRender");
predictor->mutable_weighted_random_render_conf()->set_variant_weight_list("100");
VariantConf* predictor_var = predictor->add_variants();
predictor_var->set_tag("default_tag_1");
std::string cluster = "list://" + server_port;
predictor_var->mutable_naming_conf()->set_cluster(cluster);
VariantConf* var = sdk_conf.mutable_default_variant_conf();
var->set_tag("default");
var->mutable_connection_conf()->set_connect_timeout_ms(2000);
var->mutable_connection_conf()->set_rpc_timeout_ms(200000);
var->mutable_connection_conf()->set_connect_retry_count(2);
var->mutable_connection_conf()->set_max_connection_per_host(100);
var->mutable_connection_conf()->set_hedge_request_timeout_ms(-1);
var->mutable_connection_conf()->set_hedge_fetch_retry_count(2);
var->mutable_connection_conf()->set_connection_type("pooled");
var->mutable_connection_conf()->set_connect_timeout_ms(2000);
var->mutable_naming_conf()->set_cluster_filter_strategy("Default");
var->mutable_naming_conf()->set_load_balance_strategy("la");
var->mutable_rpc_parameter()->set_compress_type(0);
var->mutable_rpc_parameter()->set_package_size(20);
var->mutable_rpc_parameter()->set_protocol("baidu_std");
var->mutable_rpc_parameter()->set_max_channel_per_request(3);
return sdk_conf.SerializePartialAsString();
}
int ServingBrpcClient::predict(const PredictorInputs& inputs,
PredictorOutputs& outputs,
const std::vector<std::string>& fetch_name,
const uint64_t log_id) {
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
// thread initialize for StubTLS
_api.thrd_initialize();
std::string variant_tag;
// predictor is bound to request with brpc::Controller
_predictor = _api.fetch_predictor("general_model", &variant_tag);
if (_predictor == NULL) {
LOG(ERROR) << "Failed fetch predictor so predict error!";
return -1;
}
// predict_res_batch.set_variant_tag(variant_tag);
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "variant_tag:" << variant_tag;
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
if (PredictorInputs::GenProto(inputs, _feed_name_to_idx, _feed_name, req) != 0) {
LOG(ERROR) << "Failed to preprocess req!";
return -1;
}
int64_t preprocess_end = timeline.TimeStampUS();
int64_t client_infer_start = timeline.TimeStampUS();
Response res;
int64_t client_infer_end = 0;
int64_t postprocess_start = 0;
int64_t postprocess_end = 0;
if (FLAGS_profile_client) {
if (FLAGS_profile_server) {
req.set_profile_server(true);
}
}
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
return -1;
}
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
if (PredictorOutputs::ParseProto(res, fetch_name, _fetch_name_to_type, outputs) != 0) {
LOG(ERROR) << "Failed to post_process res!";
return -1;
}
postprocess_end = timeline.TimeStampUS();
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << getpid() << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
oss << "op" << i << "_0:" << res.profile_time(i * 2) << " ";
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
// release predictor
_api.thrd_clear();
return 0;
}
} // namespace general_model
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/general-client/include/client.h"
#include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/general_model_service.pb.h"
namespace baidu {
namespace paddle_serving {
namespace client {
using configure::GeneralModelConfig;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Tensor;
enum ProtoDataType { P_INT64, P_FLOAT32, P_INT32, P_STRING };
int ServingClient::init(const std::vector<std::string>& client_conf,
const std::string server_port) {
if (load_client_config(client_conf) != 0) {
LOG(ERROR) << "Failed to load client config";
return -1;
}
// pure virtual func, subclass implementation
if (connect(server_port) != 0) {
LOG(ERROR) << "Failed to connect";
return -1;
}
return 0;
}
int ServingClient::load_client_config(const std::vector<std::string> &conf_file) {
try {
GeneralModelConfig model_config;
if (configure::read_proto_conf(conf_file[0].c_str(), &model_config) != 0) {
LOG(ERROR) << "Failed to load general model config"
<< ", file path: " << conf_file[0];
return -1;
}
_feed_name_to_idx.clear();
_fetch_name_to_idx.clear();
_shape.clear();
int feed_var_num = model_config.feed_var_size();
_feed_name.clear();
VLOG(2) << "feed var num: " << feed_var_num;
for (int i = 0; i < feed_var_num; ++i) {
_feed_name_to_idx[model_config.feed_var(i).alias_name()] = i;
VLOG(2) << "feed [" << i << "]"
<< " name: " << model_config.feed_var(i).name();
_feed_name.push_back(model_config.feed_var(i).name());
VLOG(2) << "feed alias name: " << model_config.feed_var(i).alias_name()
<< " index: " << i;
std::vector<int> tmp_feed_shape;
VLOG(2) << "feed"
<< "[" << i << "] shape:";
for (int j = 0; j < model_config.feed_var(i).shape_size(); ++j) {
tmp_feed_shape.push_back(model_config.feed_var(i).shape(j));
VLOG(2) << "shape[" << j << "]: " << model_config.feed_var(i).shape(j);
}
_type.push_back(model_config.feed_var(i).feed_type());
VLOG(2) << "feed"
<< "[" << i
<< "] feed type: " << model_config.feed_var(i).feed_type();
_shape.push_back(tmp_feed_shape);
}
if (conf_file.size() > 1) {
model_config.Clear();
if (configure::read_proto_conf(conf_file[conf_file.size() - 1].c_str(),
&model_config) != 0) {
LOG(ERROR) << "Failed to load general model config"
<< ", file path: " << conf_file[conf_file.size() - 1];
return -1;
}
}
int fetch_var_num = model_config.fetch_var_size();
VLOG(2) << "fetch_var_num: " << fetch_var_num;
for (int i = 0; i < fetch_var_num; ++i) {
_fetch_name_to_idx[model_config.fetch_var(i).alias_name()] = i;
VLOG(2) << "fetch [" << i << "]"
<< " alias name: " << model_config.fetch_var(i).alias_name();
_fetch_name_to_var_name[model_config.fetch_var(i).alias_name()] =
model_config.fetch_var(i).name();
_fetch_name_to_type[model_config.fetch_var(i).alias_name()] =
model_config.fetch_var(i).fetch_type();
}
} catch (std::exception &e) {
LOG(ERROR) << "Failed load general model config" << e.what();
return -1;
}
return 0;
}
void PredictorData::add_float_data(const std::vector<float>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype) {
_float_data_map[name] = data;
_shape_map[name] = shape;
_lod_map[name] = lod;
_datatype_map[name] = datatype;
}
void PredictorData::add_int64_data(const std::vector<int64_t>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype) {
_int64_data_map[name] = data;
_shape_map[name] = shape;
_lod_map[name] = lod;
_datatype_map[name] = datatype;
}
void PredictorData::add_int32_data(const std::vector<int32_t>& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype) {
_int32_data_map[name] = data;
_shape_map[name] = shape;
_lod_map[name] = lod;
_datatype_map[name] = datatype;
}
void PredictorData::add_string_data(const std::string& data,
const std::string& name,
const std::vector<int>& shape,
const std::vector<int>& lod,
const int datatype) {
_string_data_map[name] = data;
_shape_map[name] = shape;
_lod_map[name] = lod;
_datatype_map[name] = datatype;
}
int PredictorData::get_datatype(std::string name) const {
std::map<std::string, int>::const_iterator it = _datatype_map.find(name);
if (it != _datatype_map.end()) {
return it->second;
}
return 0;
}
std::string PredictorData::print() {
std::string res;
res.append(map2string<std::string, float>(_float_data_map));
res.append(map2string<std::string, int64_t>(_int64_data_map));
res.append(map2string<std::string, int32_t>(_int32_data_map));
res.append(map2string<std::string, std::string>(_string_data_map));
return res;
}
int PredictorInputs::GenProto(const PredictorInputs& inputs,
const std::map<std::string, int>& feed_name_to_idx,
const std::vector<std::string>& feed_name,
Request& req) {
const std::map<std::string, std::vector<float>>& float_feed_map = inputs.float_data_map();
const std::map<std::string, std::vector<int64_t>>& int64_feed_map = inputs.int64_data_map();
const std::map<std::string, std::vector<int32_t>>& int32_feed_map = inputs.int_data_map();
const std::map<std::string, std::string>& string_feed_map = inputs.string_data_map();
const std::map<std::string, std::vector<int>>& shape_map = inputs.shape_map();
const std::map<std::string, std::vector<int>>& lod_map = inputs.lod_map();
VLOG(2) << "float feed name size: " << float_feed_map.size();
VLOG(2) << "int feed name size: " << int64_feed_map.size();
VLOG(2) << "string feed name size: " << string_feed_map.size();
// batch is already in Tensor.
for (std::map<std::string, std::vector<float>>::const_iterator iter = float_feed_map.begin();
iter != float_feed_map.end();
++iter) {
std::string name = iter->first;
const std::vector<float>& float_data = iter->second;
const std::vector<int>& float_shape = shape_map.at(name);
const std::vector<int>& float_lod = lod_map.at(name);
// default datatype = P_FLOAT32
int datatype = inputs.get_datatype(name);
std::map<std::string, int>::const_iterator feed_name_it = feed_name_to_idx.find(name);
if (feed_name_it == feed_name_to_idx.end()) {
LOG(ERROR) << "Do not find [" << name << "] in feed_map!";
return -1;
}
int idx = feed_name_to_idx.at(name);
VLOG(2) << "prepare float feed " << name << " idx " << idx;
int total_number = float_data.size();
Tensor *tensor = req.add_tensor();
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape.size();
for (uint32_t j = 0; j < float_shape.size(); ++j) {
tensor->add_shape(float_shape[j]);
}
for (uint32_t j = 0; j < float_lod.size(); ++j) {
tensor->add_lod(float_lod[j]);
}
tensor->set_elem_type(datatype);
tensor->set_name(feed_name[idx]);
tensor->set_alias_name(name);
tensor->mutable_float_data()->Resize(total_number, 0);
memcpy(tensor->mutable_float_data()->mutable_data(), float_data.data(), total_number * sizeof(float));
}
for (std::map<std::string, std::vector<int64_t>>::const_iterator iter = int64_feed_map.begin();
iter != int64_feed_map.end();
++iter) {
std::string name = iter->first;
const std::vector<int64_t>& int64_data = iter->second;
const std::vector<int>& int64_shape = shape_map.at(name);
const std::vector<int>& int64_lod = lod_map.at(name);
// default datatype = P_INT64
int datatype = inputs.get_datatype(name);
std::map<std::string, int>::const_iterator feed_name_it = feed_name_to_idx.find(name);
if (feed_name_it == feed_name_to_idx.end()) {
LOG(ERROR) << "Do not find [" << name << "] in feed_map!";
return -1;
}
int idx = feed_name_to_idx.at(name);
Tensor *tensor = req.add_tensor();
int total_number = int64_data.size();
for (uint32_t j = 0; j < int64_shape.size(); ++j) {
tensor->add_shape(int64_shape[j]);
}
for (uint32_t j = 0; j < int64_lod.size(); ++j) {
tensor->add_lod(int64_lod[j]);
}
tensor->set_elem_type(datatype);
tensor->set_name(feed_name[idx]);
tensor->set_alias_name(name);
tensor->mutable_int64_data()->Resize(total_number, 0);
memcpy(tensor->mutable_int64_data()->mutable_data(), int64_data.data(), total_number * sizeof(int64_t));
}
for (std::map<std::string, std::vector<int32_t>>::const_iterator iter = int32_feed_map.begin();
iter != int32_feed_map.end();
++iter) {
std::string name = iter->first;
const std::vector<int32_t>& int32_data = iter->second;
const std::vector<int>& int32_shape = shape_map.at(name);
const std::vector<int>& int32_lod = lod_map.at(name);
// default datatype = P_INT32
int datatype = inputs.get_datatype(name);
std::map<std::string, int>::const_iterator feed_name_it = feed_name_to_idx.find(name);
if (feed_name_it == feed_name_to_idx.end()) {
LOG(ERROR) << "Do not find [" << name << "] in feed_map!";
return -1;
}
int idx = feed_name_to_idx.at(name);
Tensor *tensor = req.add_tensor();
int total_number = int32_data.size();
for (uint32_t j = 0; j < int32_shape.size(); ++j) {
tensor->add_shape(int32_shape[j]);
}
for (uint32_t j = 0; j < int32_lod.size(); ++j) {
tensor->add_lod(int32_lod[j]);
}
tensor->set_elem_type(datatype);
tensor->set_name(feed_name[idx]);
tensor->set_alias_name(name);
tensor->mutable_int_data()->Resize(total_number, 0);
memcpy(tensor->mutable_int_data()->mutable_data(), int32_data.data(), total_number * sizeof(int32_t));
}
for (std::map<std::string, std::string>::const_iterator iter = string_feed_map.begin();
iter != string_feed_map.end();
++iter) {
std::string name = iter->first;
const std::string& string_data = iter->second;
const std::vector<int>& string_shape = shape_map.at(name);
const std::vector<int>& string_lod = lod_map.at(name);
// default datatype = P_STRING
int datatype = inputs.get_datatype(name);
std::map<std::string, int>::const_iterator feed_name_it = feed_name_to_idx.find(name);
if (feed_name_it == feed_name_to_idx.end()) {
LOG(ERROR) << "Do not find [" << name << "] in feed_map!";
return -1;
}
int idx = feed_name_to_idx.at(name);
Tensor *tensor = req.add_tensor();
for (uint32_t j = 0; j < string_shape.size(); ++j) {
tensor->add_shape(string_shape[j]);
}
for (uint32_t j = 0; j < string_lod.size(); ++j) {
tensor->add_lod(string_lod[j]);
}
tensor->set_elem_type(datatype);
tensor->set_name(feed_name[idx]);
tensor->set_alias_name(name);
const int string_shape_size = string_shape.size();
// string_shape[vec_idx] = [1];cause numpy has no datatype of string.
// we pass string via vector<vector<string> >.
if (string_shape_size != 1) {
LOG(ERROR) << "string_shape_size should be 1-D, but received is : "
<< string_shape_size;
return -1;
}
switch (string_shape_size) {
case 1: {
tensor->add_data(string_data);
break;
}
}
}
return 0;
}
std::string PredictorOutputs::print() {
std::string res = "";
for (size_t i = 0; i < _datas.size(); ++i) {
res.append(_datas[i]->engine_name);
res.append(":");
res.append(_datas[i]->data.print());
res.append("\n");
}
return res;
}
void PredictorOutputs::clear() {
_datas.clear();
}
int PredictorOutputs::ParseProto(const Response& res,
const std::vector<std::string>& fetch_name,
std::map<std::string, int>& fetch_name_to_type,
PredictorOutputs& outputs) {
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto& output = res.outputs(m_idx);
std::shared_ptr<PredictorOutputs::PredictorOutput> predictor_output =
std::make_shared<PredictorOutputs::PredictorOutput>();
predictor_output->engine_name = output.engine_name();
std::map<std::string, std::vector<float>>& float_data_map = *predictor_output->data.mutable_float_data_map();
std::map<std::string, std::vector<int64_t>>& int64_data_map = *predictor_output->data.mutable_int64_data_map();
std::map<std::string, std::vector<int32_t>>& int32_data_map = *predictor_output->data.mutable_int_data_map();
std::map<std::string, std::string>& string_data_map = *predictor_output->data.mutable_string_data_map();
std::map<std::string, std::vector<int>>& shape_map = *predictor_output->data.mutable_shape_map();
std::map<std::string, std::vector<int>>& lod_map = *predictor_output->data.mutable_lod_map();
int idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int shape_size = output.tensor(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
shape_map[name][i] = output.tensor(idx).shape(i);
}
int lod_size = output.tensor(idx).lod_size();
if (lod_size > 0) {
lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
lod_map[name][i] = output.tensor(idx).lod(i);
}
}
idx += 1;
}
idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
if (fetch_name_to_type[name] == P_INT64) {
VLOG(2) << "fetch var " << name << "type int64";
int size = output.tensor(idx).int64_data_size();
int64_data_map[name] = std::vector<int64_t>(
output.tensor(idx).int64_data().begin(),
output.tensor(idx).int64_data().begin() + size);
} else if (fetch_name_to_type[name] == P_FLOAT32) {
VLOG(2) << "fetch var " << name << "type float";
int size = output.tensor(idx).float_data_size();
float_data_map[name] = std::vector<float>(
output.tensor(idx).float_data().begin(),
output.tensor(idx).float_data().begin() + size);
} else if (fetch_name_to_type[name] == P_INT32) {
VLOG(2) << "fetch var " << name << "type int32";
int size = output.tensor(idx).int_data_size();
int32_data_map[name] = std::vector<int32_t>(
output.tensor(idx).int_data().begin(),
output.tensor(idx).int_data().begin() + size);
}
idx += 1;
}
outputs.add_data(predictor_output);
}
return 0;
}
} // namespace client
} // namespace paddle_serving
} // namespace baidu
......@@ -26,9 +26,90 @@
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/memory.h"
// this file is included by bsf.h
namespace im {
namespace bsf {
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& batchTask) {
// 双检锁,减少加锁的粒度
if (!fetch_init) {
if (taskmeta_num > 1) {
// 对于task被拆分为多个taskmeta,需要加锁。
AutoMutex lock(task_mut);
task_fetch_create(batchTask);
} else {
// 对于task只有1个taskmeta,不需要加锁。
task_fetch_create(batchTask);
}
}
return true;
}
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
if (!fetch_init) {
vector_fetch_lod_index = batchTask.vector_fetch_lod_index;
set_fetch_nobatch_index = batchTask.set_fetch_nobatch_index;
OutVectorT taskMetaOutLodTensor;
size_t fetchvar_num = batchTask._batch_out.size();
for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
++fetchvar_index) {
size_t fetchvar_bytesize_index =
batchTask.fetchvar_bytesize(fetchvar_index);
size_t fetchvar_batch = 0;
// 1. nobatch fetchvar情况
if (set_fetch_nobatch_index.size() > 0 &&
set_fetch_nobatch_index.find(fetchvar_index) !=
set_fetch_nobatch_index.end()) {
fetchvar_batch = 1;
} else if (vector_fetch_lod_index.size() > 0 &&
std::find(vector_fetch_lod_index.begin(),
vector_fetch_lod_index.end(),
fetchvar_index) != vector_fetch_lod_index.end()) {
// lod fetchvar情况,此时无法确定总的shape[0]
// 根据task中的task_num总数开辟task_num个临时空间
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0;
} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
fetchvar_batch = batch_size();
}
paddle::PaddleTensor tensor_out;
tensor_out.name = batchTask._batch_out[fetchvar_index].name;
tensor_out.dtype =
paddle::PaddleDType(batchTask._batch_out[fetchvar_index].dtype);
tensor_out.shape = batchTask._batch_out[fetchvar_index].shape;
tensor_out.shape[0] = fetchvar_batch;
if (fetchvar_batch != 0) {
// 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
tensor_out.data.Resize(databuf_size);
} else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy
// 当task被分为多个taskMeta时,需要临时对象记录
// 收齐后再一起合并
if (taskmeta_num > 1) {
taskMetaOutLodTensor.push_back(tensor_out);
}
}
outVectorT_ptr->push_back(tensor_out);
}
// outLodTensorVector实际是一个双层vector
// shape为taskmeta_num * vector_fetch_lod_index.size();
outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
fetch_init = true;
}
return true;
}
template <typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) {
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
......@@ -136,7 +217,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
}
/*
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
if (!BatchTasks<TaskT>::check_valid(in, out, _overrun)) {
LOG(ERROR) << "Invalid input & output";
return TaskHandler<TaskT>::valid_handle();
}
......@@ -156,9 +237,11 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) {
LOG(ERROR) << "task->init() failed";
}
task->rem = task->batch_size();
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut);
_task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond);
......@@ -168,11 +251,12 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
// this function is accessed by multi thread.
// so AutoMutex at first.
// so batch.append_task is thread safe.
// so batchTask.append_task is thread safe.
// you dont need to add extra lock in append_task()
// task is already init.
template <typename TaskT>
bool TaskExecutor<TaskT>::move_task_to_batch(
BatchTasks<TaskT>& batch) { // NOLINT
BatchTasks<TaskT>& batchTask) { // NOLINT
AutoMutex lock(_mut);
while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut);
......@@ -183,15 +267,65 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
return false;
}
TaskT* previous_task = nullptr;
while (!_task_queue.empty()) {
TaskT* task = _task_queue.front();
size_t rem = batch.append_task(task);
// 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
// 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
// 只需要设置engine的属性allow_split_request = false即可。
// 复杂的处理方法是允许拆分Task,无论是否包含lod.
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
// 所以,task中先要创建taskmeta_num* fetchvar
// num(lod类型的)个临时PaddleTensor(存储data及Lod)
// 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
// 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
// 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。
// _overrun表示,异步BatchTasks是否允许单次临时超过限制。
// _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
// _overrun为false时,不允许。
// 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
// 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。
// _allow_split_request ==
// true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
// _allow_split_request ==
// false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
// 默认为true,允许拆分task从而使得空间利用率最大。
if (!batchTask.get_allow_split_request()) {
if (task->batch_size() > batchTask.get_rem_size() &&
!batchTask.get_overrun()) {
break;
}
}
// combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。
// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
// 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) {
break;
}
}
size_t rem = batchTask.append_task(task);
previous_task = task;
if (task->rem <= 0) {
_task_queue.pop_front();
}
if (rem <= 0) break;
}
LOG(INFO) << "Number of tasks remaining in _task_queue is"
<< _task_queue.size();
return true;
}
......@@ -201,11 +335,12 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
// put TaskMeta to the local variable BatchTasks<TaskT> batch.
// put TaskMeta to the local variable BatchTasks<TaskT> batchTask.
// batch.merge_tasks() and batch.notify_tasks() has no lock.
// BatchTasks<TaskT> batch itself is a local variable, it`s thread safe.
// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
......@@ -242,11 +377,23 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
return -1;
}
BatchTasks<TaskT> batch(_batch_size, _batch_align);
if (move_task_to_batch(batch)) {
batch.merge_tasks();
_fn(&batch.in(), &batch.out());
batch.notify_tasks();
// move_task_to_batch() take the original task from the `_task_queue`
// put the original task into its own Vector<taskmeta>
// the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
// `_overrun`
// merge_tasks() move the imput-data into `_batch_in` from its own
// Vector<taskmeta>.
// because the predictor`s input is the `_batch_in`
// notify_tasks() move the output-data into every single taskmeta from
// `_batch_out`.
// because the predictor`s output is the `_batch_out`
BatchTasks<TaskT> batchTask(_batch_size, _overrun, _allow_split_request);
if (move_task_to_batch(batchTask)) {
batchTask.merge_tasks();
_fn(&batchTask.in(), &batchTask.out());
batchTask.notify_tasks();
}
}
......
此差异已折叠。
......@@ -21,6 +21,15 @@
#include <string>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/op_repository.h"
#ifdef BCLOUD
#include <base/atomicops.h>
#else
#include <butil/atomicops.h>
#endif
#include <errno.h>
#include "core/predictor/framework/resource.h"
using baidu::paddle_serving::predictor::Resource;
namespace baidu {
namespace paddle_serving {
......@@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const {
return last_op->mutable_channel();
}
void* call_back(void* ori_args) {
Resource::instance().thread_initialize();
Args* args = (Args*)ori_args;
Op* op = static_cast<Op*>(args->_op);
uint64_t log_id = static_cast<uint64_t>(args->_log_id);
bool debug = static_cast<bool>(args->_debug);
args->errcode = op->process(log_id, debug);
return nullptr;
}
int ParallelDagView::execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
std::vector<THREAD_T> tids(node_size);
Args* args = new Args[node_size];
VLOG(2) << "(logid=" << log_id << ") vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
Op* op = vnode->op;
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute op[%s]", log_id, op->name());
args[ni]._op = op;
args[ni]._log_id = log_id;
args[ni]._debug = (debug_os != NULL);
int rc = THREAD_CREATE(&tids[ni], NULL, call_back, (void*)(args + ni));
if (rc != 0) {
LOG(ERROR) << "failed to create ParallelDagView worker thread: index="
<< ni << ", rc=" << rc << ", errno=" << errno << ":"
<< strerror(errno);
delete[] args;
return -1;
}
}
for (uint32_t ni = 0; ni < node_size; ni++) {
THREAD_JOIN(tids[ni], NULL);
int errcode = args[ni].errcode;
Op* op = args[ni]._op;
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute op[%s]", log_id, op->name());
if (errcode < 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Execute failed, Op:" << op->debug_string();
delete[] args;
return errcode;
}
if (errcode > 0) {
LOG(INFO) << "(logid=" << log_id
<< ") Execute ignore, Op:" << op->debug_string();
continue;
}
if (debug_os) {
(*debug_os) << "(logid=" << log_id << ") {\"op_name\": \"" << op->name()
<< "\", \"debug_str:\": \"" << op->debug_string()
<< "\", \"time_info\": \"" << op->time_info() << "\"}";
}
// LOG(DEBUG) << "Execute succ, Op:" << op->debug_string();
}
stage_time.stop();
PredictorMetric::GetInstance()->update_latency_metric(
STAGE_METRIC_PREFIX + vstage->full_name, stage_time.u_elapsed());
delete[] args;
return ERR_OK;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
......@@ -24,7 +24,7 @@ namespace baidu {
namespace paddle_serving {
namespace predictor {
class Op;
// class Op;
struct ViewNode {
Op* op; // op->full_name == service_workflow_stageindex_opname
......@@ -75,11 +75,20 @@ class DagView {
Bus* _bus;
};
struct Args {
Op* _op;
uint64_t _log_id;
bool _debug;
int errcode;
};
// The derived DagView supports parallel execution
// strategy, by implments the execute_one_stage().
class ParallelDagView : public DagView {
public:
int execute_one_stage(ViewStage* vstage, butil::IOBufBuilder*) { return 0; }
virtual int execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
};
} // namespace predictor
......
......@@ -25,7 +25,8 @@ int ReloadableInferEngine::proc_initialize_impl(
_model_dir = conf.model_dir();
_infer_thread_num = conf.runtime_thread_num();
_infer_batch_size = conf.batch_infer_size();
_infer_batch_align = conf.enable_batch_align();
_infer_overrun = conf.enable_overrun();
_allow_split_request = conf.allow_split_request();
_conf = conf;
......@@ -56,9 +57,6 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
}
// init bsf framework
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
......@@ -69,8 +67,10 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
boost::bind(&InferEngine::task_infer_impl, this, _1, _2));
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_size(
_infer_batch_size);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_batch_align(
_infer_batch_align);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].set_overrun(
_infer_overrun);
im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index]
.set_allow_split_request(_allow_split_request);
if (im::bsf::TaskExecutorVector<TaskT>::instance()[_model_index].start(
_infer_thread_num) != 0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
......@@ -79,7 +79,8 @@ int ReloadableInferEngine::proc_initialize(const configure::EngineDesc& conf,
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
<< ", enable_overrun:" << _infer_overrun
<< ", allow_split_request:" << _allow_split_request;
return 0;
}
......@@ -382,6 +383,11 @@ int VersionedInferEngine::task_infer_impl(const void* in,
return -1;
}
int InferManager::set_taskexecutor_num(size_t total_engine_num) {
im::bsf::TaskExecutorVector<TaskT>::instance().resize(total_engine_num);
return 0;
}
int InferManager::proc_initialize(const char* path,
const char* file,
std::shared_ptr<int> engine_index_ptr) {
......@@ -391,8 +397,6 @@ int InferManager::proc_initialize(const char* path,
return -1;
}
uint32_t engine_num = model_toolkit_conf.engines_size();
im::bsf::TaskExecutorVector<TaskT>::instance().resize(*engine_index_ptr +
engine_num);
for (uint32_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
......
......@@ -163,8 +163,10 @@ class ReloadableInferEngine : public InferEngine {
uint32_t _infer_batch_size;
// Need to align batch_size in inferring
bool _infer_batch_align;
bool _infer_overrun;
// allow to split request in inferring
bool _allow_split_request;
// model version
uint64_t _version;
};
......@@ -600,6 +602,8 @@ class InferManager {
const char* file,
std::shared_ptr<int> engine_index_ptr);
int set_taskexecutor_num(size_t total_engine_num);
int thrd_initialize();
int thrd_clear();
......
......@@ -135,6 +135,17 @@ int Resource::initialize(const std::string& path, const std::string& file) {
if (FLAGS_enable_model_toolkit) {
size_t model_toolkit_num = resource_conf.model_toolkit_path_size();
// 此处暂时认为,每个model_toolkit仅包含一个engine
// 故认为 model_toolkit_num == engine总数
// 若以后出现model_toolkit仅包含多个engine
// 则应先for循环统计engine总数,再set_taskexecutor_num
// 切不可动态im::bsf::TaskExecutorVector<TaskT>::instance().resize
// TaskExecutor是线程池,内含锁,在engine进程初始化时已开始work加锁循环运行了
// 之后再resize内存搬运,会导致work使用原锁,而搬运后的TaskExecutor的锁内存已改变
if (InferManager::instance().set_taskexecutor_num(model_toolkit_num) != 0) {
LOG(ERROR) << "failed set_taskexecutor_num";
return -1;
}
std::shared_ptr<int> engine_index_ptr(new int(0));
for (size_t mi = 0; mi < model_toolkit_num; ++mi) {
std::string model_toolkit_path = resource_conf.model_toolkit_path(mi);
......
......@@ -52,7 +52,9 @@ Java的HttpClient使用示例见[`java/examples/src/main/java/PaddleServingClien
如果不能满足您的需求,您也可以在此基础上添加一些功能。
如需支持https或者自定义Response的Status Code等,则需要对C++端brpc-Server进行一定的二次开发,请参考https://github.com/apache/incubator-brpc/blob/master/docs/cn/http_service.md,后续如果需求很大,我们也会将这部分功能加入到Server中,尽情期待。
如需支持https或者自定义Response的Status Code等,则需要对C++端brpc-Server进行一定的二次开发,请参考https://github.com/apache/incubator-brpc/blob/master/docs/cn/http_service.md
后续如果需求很大,我们也会将这部分功能加入到Server中,尽情期待。
### curl方式发送Http请求(基本原理)
......
......@@ -23,11 +23,9 @@ args = benchmark_args()
reader = ChineseBertReader({"max_seq_len": 128})
fetch = ["pooled_output"]
client = HttpClient(ip='127.0.0.1', port='9292')
endpoint_list = ['127.0.0.1:9292']
client = HttpClient()
client.load_client_config(args.model)
#client.set_ip('127.0.0.1')
#client.set_port('9292')
'''
if you want use GRPC-client, set_use_grpc_client(True)
or you can directly use client.grpc_client_predict(...)
......@@ -49,6 +47,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default)
if you want use JSON data format in HTTP-body, set False
'''
#client.set_http_proto(True)
client.connect(endpoint_list)
for line in sys.stdin:
feed_dict = reader.process(line)
......
......@@ -20,8 +20,6 @@ import time
client = HttpClient()
client.load_client_config(sys.argv[1])
#client.set_ip('127.0.0.1')
#client.set_port('9393')
'''
if you want use GRPC-client, set_use_grpc_client(True)
or you can directly use client.grpc_client_predict(...)
......@@ -43,13 +41,14 @@ we recommend use Proto data format in HTTP-body, set True(which is default)
if you want use JSON data format in HTTP-body, set False
'''
#client.set_http_proto(True)
client.connect(["127.0.0.1:9393"])
fetch_list = client.get_fetch_names()
import paddle
test_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.test(), buf_size=500),
batch_size=1)
fetch_list = client.get_fetch_names()
for data in test_reader():
new_data = np.zeros((1, 13)).astype("float32")
new_data[0] = data[0][0]
......
......@@ -18,10 +18,8 @@ from paddle_serving_app.reader import Sequential, URL2Image, Resize
from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize
import time
client = HttpClient(ip='127.0.0.1', port='9696')
client = HttpClient()
client.load_client_config(sys.argv[1])
#client.set_ip('127.0.0.1')
#client.set_port('9292')
'''
if you want use GRPC-client, set_use_grpc_client(True)
or you can directly use client.grpc_client_predict(...)
......@@ -43,6 +41,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default)
if you want use JSON data format in HTTP-body, set False
'''
#client.set_http_proto(True)
client.connect(["127.0.0.1:9696"])
label_dict = {}
label_idx = 0
......
......@@ -17,10 +17,8 @@ from paddle_serving_app.reader.imdb_reader import IMDBDataset
import sys
import numpy as np
client = HttpClient(ip='127.0.0.1', port='9292')
client = HttpClient()
client.load_client_config(sys.argv[1])
#client.set_ip('127.0.0.1')
#client.set_port('9292')
'''
if you want use GRPC-client, set_use_grpc_client(True)
or you can directly use client.grpc_client_predict(...)
......@@ -42,6 +40,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default)
if you want use JSON data format in HTTP-body, set False
'''
#client.set_http_proto(True)
client.connect(["127.0.0.1:9292"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
......
......@@ -21,10 +21,8 @@ import os
import io
import numpy as np
client = HttpClient(ip='127.0.0.1', port='9292')
client = HttpClient()
client.load_client_config(sys.argv[1])
#client.set_ip('127.0.0.1')
#client.set_port('9292')
'''
if you want use GRPC-client, set_use_grpc_client(True)
or you can directly use client.grpc_client_predict(...)
......@@ -46,6 +44,7 @@ we recommend use Proto data format in HTTP-body, set True(which is default)
if you want use JSON data format in HTTP-body, set False
'''
#client.set_http_proto(True)
client.connect(["127.0.0.1:9292"])
reader = LACReader()
for line in sys.stdin:
......
# coding=utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server.web_service import WebService
from paddle_serving_app.reader import ChineseBertReader
import sys
import os
import numpy as np
class BertService(WebService):
def load(self):
self.reader = ChineseBertReader({
"vocab_file": "vocab.txt",
"max_seq_len": 128
})
def preprocess(self, feed=[], fetch=[]):
feed_res = []
is_batch = False
for ins in feed:
feed_dict = self.reader.process(ins["words"].encode("utf-8"))
for key in feed_dict.keys():
feed_dict[key] = np.array(feed_dict[key]).reshape(
(len(feed_dict[key]), 1))
feed_res.append(feed_dict)
return feed_res, fetch, is_batch
bert_service = BertService(name="bert")
bert_service.load()
bert_service.load_model_config(sys.argv[1])
bert_service.prepare_server(
workdir="workdir", port=int(sys.argv[2]), use_lite=True, use_xpu=True, ir_optim=True)
bert_service.run_rpc_service()
bert_service.run_web_service()
# coding=utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server.web_service import WebService
from paddle_serving_app.reader import ChineseBertReader
import sys
import os
import numpy as np
class BertService(WebService):
def load(self):
self.reader = ChineseBertReader({
"vocab_file": "vocab.txt",
"max_seq_len": 128
})
def preprocess(self, feed=[], fetch=[]):
feed_res = []
is_batch = False
for ins in feed:
feed_dict = self.reader.process(ins["words"].encode("utf-8"))
for key in feed_dict.keys():
feed_dict[key] = np.array(feed_dict[key]).reshape(
(len(feed_dict[key]), 1))
feed_res.append(feed_dict)
return feed_res, fetch, is_batch
bert_service = BertService(name="bert")
bert_service.load()
bert_service.load_model_config(sys.argv[1])
bert_service.prepare_server(
workdir="workdir", port=int(sys.argv[2]), use_lite=True, use_xpu=True, ir_optim=True)
bert_service.run_rpc_service()
bert_service.run_web_service()
......@@ -23,18 +23,3 @@ The `paddlepaddle` package is used in `test_client.py`, and you may need to down
``` shell
python3 test_client.py uci_housing_client/serving_client_conf.prototxt
```
## HTTP service
### Start server
Start a web service with default web service hosting modules:
``` shell
python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393 --use_lite --use_xpu --ir_optim --name uci
```
### Client prediction
``` shell
curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' http://127.0.0.1:9393/uci/prediction
```
......@@ -31,19 +31,3 @@ python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --p
``` shell
python3 test_client.py uci_housing_client/serving_client_conf.prototxt
```
## HTTP服务
### 开启服务端
通过下面的一行代码开启默认web服务:
``` shell
python3 -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393 --use_lite --use_xpu --ir_optim --name uci
```
### 客户端预测
``` shell
curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]}], "fetch":["price"]}' http://127.0.0.1:9393/uci/prediction
```
......@@ -289,6 +289,7 @@ class Client(object):
log_id=0):
self.profile_.record('py_prepro_0')
# fetch 可以为空,此时会取所有的输出结果
if feed is None:
raise ValueError("You should specify feed for prediction")
......@@ -297,6 +298,7 @@ class Client(object):
fetch_list = [fetch]
elif isinstance(fetch, list):
fetch_list = fetch
# fetch 可以为空,此时会取所有的输出结果
elif fetch == None:
pass
else:
......@@ -341,7 +343,6 @@ class Client(object):
string_feed_names = []
string_lod_slot_batch = []
string_shape = []
fetch_names = []
for key in fetch_list:
......@@ -442,6 +443,7 @@ class Client(object):
model_engine_names = result_batch_handle.get_engine_names()
for mi, engine_name in enumerate(model_engine_names):
result_map = {}
# fetch 为空,则会取所有的输出结果
if len(fetch_names) == 0:
fetch_names = result_batch_handle.get_tensor_alias_names(mi)
# result map needs to be a numpy array
......
......@@ -22,6 +22,7 @@ import gzip
from collections import Iterable
import base64
import sys
import re
import grpc
from .proto import general_model_service_pb2
......@@ -98,7 +99,7 @@ class HttpClient(object):
self.headers["Content-Type"] = "application/proto"
self.max_body_size = 512 * 1024 * 1024
self.use_grpc_client = False
self.url = None
self.http_s = "http://"
# 使用连接池能够不用反复建立连接
self.requests_session = requests.session()
......@@ -170,7 +171,6 @@ class HttpClient(object):
def set_max_body_size(self, max_body_size):
self.max_body_size = max_body_size
self.init_grpc_stub()
def set_timeout_ms(self, timeout_ms):
if not isinstance(timeout_ms, int):
......@@ -183,25 +183,46 @@ class HttpClient(object):
raise ValueError("retry_times must be int type.")
else:
self.requests_session.mount(
'http://', HTTPAdapter(max_retries=retry_times))
def set_ip(self, ip):
self.ip = ip
self.init_grpc_stub()
self.http_s, HTTPAdapter(max_retries=retry_times))
def set_service_name(self, service_name):
self.service_name = service_name
def set_port(self, port):
self.port = port
self.server_port = port
self.init_grpc_stub()
def set_url(self, url):
def connect(self, url=None, encryption=False):
if isinstance(url, (list, tuple)):
if len(url) > 1:
raise ValueError("HttpClient only support 1 endpoint")
else:
url = url[0]
if isinstance(url, str):
self.url = url
if url.startswith("https://"):
url = url[8:]
self.http_s = "https://"
if url.startswith("http://"):
url = url[7:]
self.http_s = "http://"
url_parts = url.split(':')
if len(url_parts) != 2 or self.check_ip(url_parts[0]) == False:
raise ValueError(
"url not right, it should be like 127.0.0.1:9393 or http://127.0.0.1:9393"
)
else:
self.ip = url_parts[0]
self.port = url_parts[1]
self.server_port = url_parts[1]
if encryption:
self.get_serving_port()
if self.use_grpc_client:
self.init_grpc_stub()
def check_ip(self, ipAddr):
compile_ip = re.compile(
'^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$'
)
if compile_ip.match(ipAddr):
return True
else:
print("url must be str")
return False
def add_http_headers(self, headers):
if isinstance(headers, dict):
......@@ -229,10 +250,9 @@ class HttpClient(object):
def use_key(self, key_filename):
with open(key_filename, "rb") as f:
self.key = f.read()
self.get_serving_port()
def get_serving_port(self):
encrypt_url = "http://" + str(self.ip) + ":" + str(self.port)
encrypt_url = self.http_s + str(self.ip) + ":" + str(self.port)
if self.key is not None:
req = json.dumps({"key": base64.b64encode(self.key).decode()})
else:
......@@ -481,13 +501,7 @@ class HttpClient(object):
postData = self.process_json_data(feed_dict, fetch_list, batch,
log_id)
web_url = "http://" + self.ip + ":" + self.server_port + self.service_name
if self.url != None:
if "http" not in self.url:
self.url = "http://" + self.url
if "self.service_name" not in self.url:
self.url = self.url + self.service_name
web_url = self.url
web_url = self.http_s + self.ip + ":" + self.server_port + self.service_name
# 当数据区长度大于512字节时才压缩.
self.headers.pop("Content-Encoding", "nokey")
try:
......
......@@ -228,7 +228,8 @@ class Server(object):
engine.batch_infer_size = self.op_max_batch[index %
len(self.op_max_batch)]
engine.enable_batch_align = 1
engine.enable_overrun = False
engine.allow_split_request = True
engine.model_dir = model_config_path
engine.enable_memory_optimization = self.memory_optimization
engine.enable_ir_optimization = self.ir_optimization
......
......@@ -45,7 +45,9 @@ class ChannelDataErrcode(enum.Enum):
CLOSED_ERROR = 6
NO_SERVICE = 7
UNKNOW = 8
PRODUCT_ERROR = 9
INPUT_PARAMS_ERROR = 9
PRODUCT_ERROR = 100
class ProductErrCode(enum.Enum):
......
......@@ -18,22 +18,110 @@ option go_package = "./;pipeline_serving";
import "google/api/annotations.proto";
// Tensor structure, consistent with PADDLE variable types.
// Descriptions of input and output data.
message Tensor {
// VarType: INT64
repeated int64 int64_data = 1;
// VarType: FP32, FP16
repeated float float_data = 2;
// VarType: INT32, INT16, INT8
repeated int32 int_data = 3;
// VarType: FP64
repeated double float64_data = 4;
// VarType: BF16, UINT8
repeated uint32 uint32_data = 5;
// VarType: BOOL
repeated bool bool_data = 6;
// (No support)VarType: COMPLEX64, 2x represents the real part, 2x+1
// represents the imaginary part
repeated float complex64_data = 7;
// (No support)VarType: COMPLEX128, 2x represents the real part, 2x+1
// represents the imaginary part
repeated double complex128_data = 8;
// VarType: STRING
repeated string str_data = 9;
// Element types:
// 0 => INT64
// 1 => FP32
// 2 => INT32
// 3 => FP64
// 4 => INT16
// 5 => FP16
// 6 => BF16
// 7 => UINT8
// 8 => INT8
// 9 => BOOL
// 10 => COMPLEX64
// 11 => COMPLEX128
// 12 => STRING
int32 elem_type = 10;
// Shape of the tensor, including batch dimensions.
repeated int32 shape = 11;
// Level of data(LOD), support variable length data, only for fetch tensor
// currently.
repeated int32 lod = 12;
// Correspond to the variable 'name' in the model description prototxt.
string name = 13;
};
// The structure of the service request. The input data can be repeated string
// pairs or tensors.
message Request {
// The input data are repeated string pairs.
// for examples. key is "words", value is the string of words.
repeated string key = 1;
repeated string value = 2;
// The input data are repeated tensors for complex data structures.
// Becase tensors can save more data information and reduce the amount of data
// transferred.
repeated Tensor tensors = 3;
// The name field in the RESTful API
string name = 4;
// The method field in the RESTful API
string method = 5;
// For tracing requests and logs
int64 logid = 6;
// For tracking sources
string clientip = 7;
};
// The structure of the service response. The output data can be repeated string
// pairs or tensors.
message Response {
// Error code
int32 err_no = 1;
// Error messages
string err_msg = 2;
// The results of string pairs
repeated string key = 3;
repeated string value = 4;
};
message Request {
repeated string key = 1;
repeated string value = 2;
string name = 3;
string method = 4;
int64 logid = 5;
string clientip = 6;
// The results of tensors
repeated Tensor tensors = 5;
};
// Python pipeline service
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
......
......@@ -45,6 +45,23 @@ from .pipeline_client import PipelineClient as PPClient
_LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op")
# data type of tensor to numpy_data
_TENSOR_DTYPE_2_NUMPY_DATA_DTYPE = {
0: "int64", # VarType.INT64
1: "float32", # VarType.FP32
2: "int32", # VarType.INT32
3: "float64", # VarType.FP64
4: "int16", # VarType.int16
5: "float16", # VarType.FP32
6: "uint16", # VarType.BF16
7: "uint8", # VarType.UINT8
8: "int8", # VarType.INT8
9: "bool", # VarType.BOOL
10: "complex64", # VarType.COMPLEX64
11: "complex128", # VarType.COMPLEX128
12: "string", # dismatch with numpy
}
class Op(object):
def __init__(self,
......@@ -85,6 +102,9 @@ class Op(object):
self._server_use_profile = False
self._tracer = None
# for grpc_pipeline predict mode. False, string key/val; True, tensor format.
self._pack_tensor_format = False
# only for thread op
self._for_init_op_lock = threading.Lock()
self._for_close_op_lock = threading.Lock()
......@@ -372,6 +392,9 @@ class Op(object):
os._exit(-1)
self._input_ops.append(op)
def set_pack_tensor_format(self, is_tensor_format=False):
self._pack_tensor_format = is_tensor_format
def get_jump_to_ops(self):
return self._jump_to_ops
......@@ -577,6 +600,7 @@ class Op(object):
feed_dict=feed_batch[0],
fetch=self._fetch_names,
asyn=False,
pack_tensor_format=self._pack_tensor_format,
profile=False)
if call_result is None:
_LOGGER.error(
......@@ -1530,6 +1554,85 @@ class RequestOp(Op):
_LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1)
def proto_tensor_2_numpy(self, tensor):
"""
Convert proto tensor to numpy array, The supported types are as follows:
INT64
FP32
INT32
FP64
INT16
FP16
BF16
UINT8
INT8
BOOL
Unsupported type:
COMPLEX64
COMPLEX128
STRING
Args:
tensor: one tensor in request.tensors.
Returns:
np.ndnumpy
"""
if tensor is None or tensor.elem_type is None or tensor.name is None:
_LOGGER.error("input params of tensor is wrong. tensor: {}".format(
tensor))
return None
dims = []
if tensor.shape is None:
dims.append(1)
else:
for one_dim in tensor.shape:
dims.append(one_dim)
np_data = None
_LOGGER.info("proto_to_numpy, name:{}, type:{}, dims:{}".format(
tensor.name, tensor.elem_type, dims))
if tensor.elem_type == 0:
# VarType: INT64
np_data = np.array(tensor.int64_data).astype(int64).reshape(dims)
elif tensor.elem_type == 1:
# VarType: FP32
np_data = np.array(tensor.float_data).astype(float32).reshape(dims)
elif tensor.elem_type == 2:
# VarType: INT32
np_data = np.array(tensor.int_data).astype(int32).reshape(dims)
elif tensor.elem_type == 3:
# VarType: FP64
np_data = np.array(tensor.float64_data).astype(float64).reshape(
dims)
elif tensor.elem_type == 4:
# VarType: INT16
np_data = np.array(tensor.int_data).astype(int16).reshape(dims)
elif tensor.elem_type == 5:
# VarType: FP16
np_data = np.array(tensor.float_data).astype(float16).reshape(dims)
elif tensor.elem_type == 6:
# VarType: BF16
np_data = np.array(tensor.uint32_data).astype(uint16).reshape(dims)
elif tensor.elem_type == 7:
# VarType: UINT8
np_data = np.array(tensor.uint32_data).astype(uint8).reshape(dims)
elif tensor.elem_type == 8:
# VarType: INT8
np_data = np.array(tensor.int_data).astype(int8).reshape(dims)
elif tensor.elem_type == 9:
# VarType: BOOL
np_data = np.array(tensor.bool_data).astype(bool).reshape(dims)
else:
_LOGGER.error("Sorry, the type {} of tensor {} is not supported.".
format(tensor.elem_type, tensor.name))
raise ValueError(
"Sorry, the type {} of tensor {} is not supported.".format(
tensor.elem_type, tensor.name))
return np_data
def unpack_request_package(self, request):
"""
Unpack request package by gateway.proto
......@@ -1550,9 +1653,43 @@ class RequestOp(Op):
_LOGGER.critical("request is None")
raise ValueError("request is None")
# unpack key/value string list
for idx, key in enumerate(request.key):
dict_data[key] = request.value[idx]
log_id = request.logid
# unpack proto.tensors data.
for one_tensor in request.tensors:
name = one_tensor.name
elem_type = one_tensor.elem_type
if one_tensor.name is None:
_LOGGER.error("Tensor name is None.")
raise ValueError("Tensor name is None.")
numpy_dtype = _TENSOR_DTYPE_2_NUMPY_DATA_DTYPE.get(elem_type)
if numpy_dtype is None:
_LOGGER.error(
"elem_type:{} is dismatch in unpack_request_package.",
format(elem_type))
raise ValueError("elem_type:{} error".format(elem_type))
if numpy_dtype == "string":
new_string = ""
if one_tensor.str_data is None:
_LOGGER.error(
"str_data of tensor:{} is None, elem_type is {}.".
format(name, elem_type))
raise ValueError(
"str_data of tensor:{} is None, elem_type is {}.".
format(name, elem_type))
for one_str in one_tensor.str_data:
new_string += one_str
dict_data[name] = new_string
else:
dict_data[name] = self.proto_tensor_2_numpy(one_tensor)
_LOGGER.debug("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
request.method))
......@@ -1574,6 +1711,7 @@ class ResponseOp(Op):
"""
super(ResponseOp, self).__init__(
name="@DAGExecutor", input_ops=input_ops)
# init op
try:
self.init_op()
......@@ -1582,6 +1720,12 @@ class ResponseOp(Op):
e, exc_info=True))
os._exit(-1)
# init ResponseOp
self.is_pack_tensor = False
def set_pack_format(self, isTensor=False):
self.is_pack_tensor = isTensor
def pack_response_package(self, channeldata):
"""
Getting channeldata from the last channel, packting the response
......
......@@ -46,7 +46,7 @@ class PipelineClient(object):
self._stub = pipeline_service_pb2_grpc.PipelineServiceStub(
self._channel)
def _pack_request_package(self, feed_dict, profile):
def _pack_request_package(self, feed_dict, pack_tensor_format, profile):
req = pipeline_service_pb2.Request()
logid = feed_dict.get("logid")
......@@ -69,25 +69,88 @@ class PipelineClient(object):
feed_dict.pop("clientip")
np.set_printoptions(threshold=sys.maxsize)
for key, value in feed_dict.items():
req.key.append(key)
if (sys.version_info.major == 2 and isinstance(value,
(str, unicode)) or
((sys.version_info.major == 3) and isinstance(value, str))):
req.value.append(value)
continue
if isinstance(value, np.ndarray):
req.value.append(value.__repr__())
elif isinstance(value, list):
req.value.append(np.array(value).__repr__())
else:
raise TypeError("only str and np.ndarray type is supported: {}".
format(type(value)))
if profile:
req.key.append(self._profile_key)
req.value.append(self._profile_value)
if pack_tensor_format is False:
# pack string key/val format
for key, value in feed_dict.items():
req.key.append(key)
if (sys.version_info.major == 2 and
isinstance(value, (str, unicode)) or
((sys.version_info.major == 3) and isinstance(value, str))):
req.value.append(value)
continue
if isinstance(value, np.ndarray):
req.value.append(value.__repr__())
elif isinstance(value, list):
req.value.append(np.array(value).__repr__())
else:
raise TypeError(
"only str and np.ndarray type is supported: {}".format(
type(value)))
if profile:
req.key.append(self._profile_key)
req.value.append(self._profile_value)
else:
# pack tensor format
for key, value in feed_dict.items():
one_tensor = req.tensors.add()
one_tensor.name = key
if (sys.version_info.major == 2 and
isinstance(value, (str, unicode)) or
((sys.version_info.major == 3) and isinstance(value, str))):
one_tensor.string_data.add(value)
one_tensor.elem_type = 12 #12 => string
continue
if isinstance(value, np.ndarray):
# copy shape
_LOGGER.info("value shape is {}".format(value.shape))
for one_dim in value.shape:
one_tensor.shape.append(one_dim)
flat_value = value.flatten().tolist()
# copy data
if value.dtype == "int64":
one_tensor.int64_data.extend(flat_value)
one_tensor.elem_type = 0
elif value.dtype == "float32":
one_tensor.float_data.extend(flat_value)
one_tensor.elem_type = 1
elif value.dtype == "int32":
one_tensor.int_data.extend(flat_value)
one_tensor.elem_type = 2
elif value.dtype == "float64":
one_tensor.float64_data.extend(flat_value)
one_tensor.elem_type = 3
elif value.dtype == "int16":
one_tensor.int_data.extend(flat_value)
one_tensor.elem_type = 4
elif value.dtype == "float16":
one_tensor.float_data.extend(flat_value)
one_tensor.elem_type = 5
elif value.dtype == "uint16":
one_tensor.uint32_data.extend(flat_value)
one_tensor.elem_type = 6
elif value.dtype == "uint8":
one_tensor.uint32_data.extend(flat_value)
one_tensor.elem_type = 7
elif value.dtype == "int8":
one_tensor.int_data.extend(flat_value)
one_tensor.elem_type = 8
elif value.dtype == "bool":
one_tensor.bool_data.extend(flat_value)
one_tensor.elem_type = 9
else:
_LOGGER.error(
"value type {} of tensor {} is not supported.".
format(value.dtype, key))
else:
raise TypeError(
"only str and np.ndarray type is supported: {}".format(
type(value)))
return req
def _unpack_response_package(self, resp, fetch):
......@@ -97,6 +160,7 @@ class PipelineClient(object):
feed_dict,
fetch=None,
asyn=False,
pack_tensor_format=False,
profile=False,
log_id=0):
if not isinstance(feed_dict, dict):
......@@ -104,7 +168,8 @@ class PipelineClient(object):
"feed must be dict type with format: {name: value}.")
if fetch is not None and not isinstance(fetch, list):
raise TypeError("fetch must be list type with format: [name].")
req = self._pack_request_package(feed_dict, profile)
req = self._pack_request_package(feed_dict, pack_tensor_format, profile)
req.logid = log_id
if not asyn:
resp = self._stub.inference(req)
......
......@@ -12,25 +12,113 @@
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
syntax = "proto3";
package baidu.paddle_serving.pipeline_serving;
// Tensor structure, consistent with PADDLE variable types.
// Descriptions of input and output data.
message Tensor {
// VarType: INT64
repeated int64 int64_data = 1;
// VarType: FP32, FP16
repeated float float_data = 2;
// VarType: INT32, INT16, INT8
repeated int32 int_data = 3;
// VarType: FP64
repeated double float64_data = 4;
// VarType: BF16, UINT8
repeated uint32 uint32_data = 5;
// VarType: BOOL
repeated bool bool_data = 6;
// (No support)VarType: COMPLEX64, 2x represents the real part, 2x+1
// represents the imaginary part
repeated float complex64_data = 7;
// (No support)VarType: COMPLEX128, 2x represents the real part, 2x+1
// represents the imaginary part
repeated double complex128_data = 8;
// VarType: STRING
repeated string str_data = 9;
// Element types:
// 0 => INT64
// 1 => FP32
// 2 => INT32
// 3 => FP64
// 4 => INT16
// 5 => FP16
// 6 => BF16
// 7 => UINT8
// 8 => INT8
// 9 => BOOL
// 10 => COMPLEX64
// 11 => COMPLEX128
// 12 => STRING
int32 elem_type = 10;
// Shape of the tensor, including batch dimensions.
repeated int32 shape = 11;
// Level of data(LOD), support variable length data, only for fetch tensor
// currently.
repeated int32 lod = 12;
// Correspond to the variable 'name' in the model description prototxt.
string name = 13;
};
// The structure of the service request. The input data can be repeated string
// pairs or tensors.
message Request {
// The input data are repeated string pairs.
// for examples. key is "words", value is the string of words.
repeated string key = 1;
repeated string value = 2;
optional string name = 3;
optional string method = 4;
optional int64 logid = 5;
optional string clientip = 6;
// The input data are repeated tensors for complex data structures.
// Becase tensors can save more data information and reduce the amount of data
// transferred.
repeated Tensor tensors = 3;
// The name field in the RESTful API
string name = 4;
// The method field in the RESTful API
string method = 5;
// For tracing requests and logs
int64 logid = 6;
// For tracking sources
string clientip = 7;
};
// The structure of the service response. The output data can be repeated string
// pairs or tensors.
message Response {
optional int32 err_no = 1;
optional string err_msg = 2;
// Error code
int32 err_no = 1;
// Error messages
string err_msg = 2;
// The results of string pairs
repeated string key = 3;
repeated string value = 4;
// The results of tensors
repeated Tensor tensors = 5;
};
// Python pipeline service
service PipelineService {
rpc inference(Request) returns (Response) {}
};
......@@ -40,9 +40,9 @@ go env -w GO111MODULE=auto
build_whl_list=(build_cpu_server build_gpu_server build_client build_app)
rpc_model_list=(grpc_fit_a_line grpc_yolov4 pipeline_imagenet bert_rpc_gpu bert_rpc_cpu ResNet50_rpc \
lac_rpc cnn_rpc bow_rpc lstm_rpc fit_a_line_rpc deeplabv3_rpc mobilenet_rpc unet_rpc resnetv2_rpc \
lac_rpc_asyn cnn_rpc_asyn bow_rpc lstm_rpc fit_a_line_rpc deeplabv3_rpc mobilenet_rpc unet_rpc resnetv2_rpc \
criteo_ctr_rpc_cpu criteo_ctr_rpc_gpu ocr_rpc yolov4_rpc_gpu faster_rcnn_hrnetv2p_w18_1x_encrypt \
faster_rcnn_model_rpc low_precision_resnet50_int8 ocr_c++_service)
faster_rcnn_model_rpc low_precision_resnet50_int8 ocr_c++_service ocr_c++_service_asyn)
http_model_list=(fit_a_line_http lac_http imdb_http_proto imdb_http_json imdb_grpc ResNet50_http bert_http \
pipeline_ocr_cpu_http)
......@@ -492,7 +492,7 @@ function ResNet101_rpc() {
kill_server_process
}
function cnn_rpc() {
function cnn_rpc_asyn() {
dir=${log_dir}rpc_model/cnn_rpc/
check_dir ${dir}
unsetproxy
......@@ -500,8 +500,9 @@ function cnn_rpc() {
data_dir=${data}imdb/
link_data ${data_dir}
sed -i 's/9292/8865/g' test_client.py
${py_version} -m paddle_serving_server.serve --model imdb_cnn_model/ --port 8865 > ${dir}server_log.txt 2>&1 &
check_result server 5
${py_version} -m paddle_serving_server.serve --model imdb_cnn_model/ --port 8865 --op_num 4 --thread 10 --gpu_ids 0 > ${dir}server_log.txt 2>&1 &
check_result server 8
check_gpu_memory 0
head test_data/part-0 | ${py_version} test_client.py imdb_cnn_client_conf/serving_client_conf.prototxt imdb.vocab > ${dir}client_log.txt 2>&1
check_result client "cnn_CPU_RPC server test completed"
kill_server_process
......@@ -537,7 +538,7 @@ function lstm_rpc() {
kill_server_process
}
function lac_rpc() {
function lac_rpc_asyn() {
dir=${log_dir}rpc_model/lac_rpc/
check_dir ${dir}
unsetproxy
......@@ -545,8 +546,9 @@ function lac_rpc() {
data_dir=${data}lac/
link_data ${data_dir}
sed -i 's/9292/8868/g' lac_client.py
${py_version} -m paddle_serving_server.serve --model lac_model/ --port 8868 > ${dir}server_log.txt 2>&1 &
check_result server 5
${py_version} -m paddle_serving_server.serve --model lac_model/ --port 8868 --gpu_ids 0 --op_num 2 > ${dir}server_log.txt 2>&1 &
check_result server 8
check_gpu_memory 0
echo "我爱北京天安门" | ${py_version} lac_client.py lac_client/serving_client_conf.prototxt lac_dict/ > ${dir}client_log.txt 2>&1
check_result client "lac_CPU_RPC server test completed"
kill_server_process
......@@ -923,6 +925,23 @@ function ocr_c++_service() {
kill_server_process
}
function ocr_c++_service_asyn() {
dir=${log_dir}rpc_model/ocr_c++_serving/
cd ${build_path}/python/examples/ocr
check_dir ${dir}
echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC asyn_server started${RES}"
$py_version -m paddle_serving_server.serve --model ocr_det_model ocr_rec_model --port 9293 --gpu_id 0 --op_num 4 > ${dir}server_log.txt 2>&1 &
check_result server 8
check_gpu_memory 0
echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC client started${RES}"
echo "------------------first:"
$py_version ocr_cpp_client.py ocr_det_client ocr_rec_client
echo "------------------second:"
$py_version ocr_cpp_client.py ocr_det_client ocr_rec_client > ${dir}client_log.txt 2>&1
check_result client "OCR_C++_Service_GPU_RPC server test completed"
kill_server_process
}
function build_all_whl() {
for whl in ${build_whl_list[@]}
do
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册