提交 58c655d1 编写于 作者: M MRXLT

Merge remote-tracking branch 'upstream/develop' into general-server-imagenet

...@@ -30,17 +30,18 @@ pip install paddle-serving-server ...@@ -30,17 +30,18 @@ pip install paddle-serving-server
## Quick Start Example ## Quick Start Example
### download trained model ### Boston House Price Prediction model
``` shell ``` shell
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz tar -xzf uci_housing.tar.gz
``` ```
Paddle Serving provides HTTP and RPC based service for users to access Paddle Serving provides HTTP and RPC based service for users to access
### HTTP service ### HTTP service
``` shell ``` shell
python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 --name uci python -m paddle_serving_server.web_serve --model uci_housing_model --thread 10 --port 9292 --name uci
``` ```
``` shell ``` shell
curl -H "Content-Type:application/json" -X POST -d '{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332], "fetch":["price"]}' http://127.0.0.1:9292/uci/prediction curl -H "Content-Type:application/json" -X POST -d '{"x": [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332], "fetch":["price"]}' http://127.0.0.1:9292/uci/prediction
...@@ -51,9 +52,9 @@ curl -H "Content-Type:application/json" -X POST -d '{"x": [0.0137, -0.1136, 0.25 ...@@ -51,9 +52,9 @@ curl -H "Content-Type:application/json" -X POST -d '{"x": [0.0137, -0.1136, 0.25
``` shell ``` shell
python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292
``` ```
python client api
``` python ``` python
# A user can visit rpc service through paddle_serving_client API
from paddle_serving_client import Client from paddle_serving_client import Client
client = Client() client = Client()
...@@ -66,9 +67,23 @@ print(fetch_map) ...@@ -66,9 +67,23 @@ print(fetch_map)
``` ```
## Models waiting for you to deploy
<center>
| Model Name | Resnet50 |
|:--------------------: |:----------------------------------: |
| Package URL | To be released |
| Description | Get the representation of an image |
| Training Data Source | Imagenet |
</center>
## Document ## Document
[Design Doc(Chinese)](doc/DESIGN.md) [How to save a servable model?](doc/SAVE.md)
[How to config Serving native operators on server side?](doc/SERVER_DAG.md) [How to config Serving native operators on server side?](doc/SERVER_DAG.md)
...@@ -80,9 +95,17 @@ print(fetch_map) ...@@ -80,9 +95,17 @@ print(fetch_map)
[FAQ(Chinese)](doc/FAQ.md) [FAQ(Chinese)](doc/FAQ.md)
[Design Doc(Chinese)](doc/DESIGN.md)
## Join Community ## Join Community
To connect with other users and contributors, welcome to join our [Slack channel](https://paddleserving.slack.com/archives/CUBPKHKMJ) To connect with other users and contributors, welcome to join our [Slack channel](https://paddleserving.slack.com/archives/CUBPKHKMJ)
## Contribution ## Contribution
If you want to contribute code to Paddle Serving, please reference [Contribution Guidelines](doc/CONTRIBUTE.md) If you want to contribute code to Paddle Serving, please reference [Contribution Guidelines](doc/CONTRIBUTE.md)
### Feedback
For any feedback or to report a bug, please propose a [GitHub Issue](https://github.com/PaddlePaddle/Serving/issues).
## License
[Apache 2.0 License](https://github.com/PaddlePaddle/Serving/blob/develop/LICENSE)
...@@ -45,12 +45,12 @@ class PredictorRes { ...@@ -45,12 +45,12 @@ class PredictorRes {
~PredictorRes() {} ~PredictorRes() {}
public: public:
const std::vector<std::vector<int64_t>> & get_int64_by_name( const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string & name) { const std::string& name) {
return _int64_map[name]; return _int64_map[name];
} }
const std::vector<std::vector<float>> & get_float_by_name( const std::vector<std::vector<float>>& get_float_by_name(
const std::string & name) { const std::string& name) {
return _float_map[name]; return _float_map[name];
} }
...@@ -71,7 +71,7 @@ class PredictorClient { ...@@ -71,7 +71,7 @@ class PredictorClient {
void set_predictor_conf(const std::string& conf_path, void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file); const std::string& conf_file);
int create_predictor_by_desc(const std::string & sdk_desc); int create_predictor_by_desc(const std::string& sdk_desc);
int create_predictor(); int create_predictor();
int destroy_predictor(); int destroy_predictor();
...@@ -81,7 +81,8 @@ class PredictorClient { ...@@ -81,7 +81,8 @@ class PredictorClient {
const std::vector<std::vector<int64_t>>& int_feed, const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name, const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes & predict_res); // NOLINT PredictorRes& predict_res, // NOLINT
const int& pid);
std::vector<std::vector<float>> predict( std::vector<std::vector<float>> predict(
const std::vector<std::vector<float>>& float_feed, const std::vector<std::vector<float>>& float_feed,
......
...@@ -132,13 +132,13 @@ int PredictorClient::create_predictor() { ...@@ -132,13 +132,13 @@ int PredictorClient::create_predictor() {
_api.thrd_initialize(); _api.thrd_initialize();
} }
int PredictorClient::predict( int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
const std::vector<std::vector<float>>& float_feed, const std::vector<std::string> &float_feed_name,
const std::vector<std::string>& float_feed_name, const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::vector<int64_t>>& int_feed, const std::vector<std::string> &int_feed_name,
const std::vector<std::string>& int_feed_name, const std::vector<std::string> &fetch_name,
const std::vector<std::string>& fetch_name, PredictorRes &predict_res,
PredictorRes & predict_res) { // NOLINT const int &pid) { // NOLINT
predict_res._int64_map.clear(); predict_res._int64_map.clear();
predict_res._float_map.clear(); predict_res._float_map.clear();
Timer timeline; Timer timeline;
...@@ -218,6 +218,7 @@ int PredictorClient::predict( ...@@ -218,6 +218,7 @@ int PredictorClient::predict(
VLOG(2) << "fetch name: " << name; VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = res.insts(0).tensor_array(idx).int64_data_size(); int len = res.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._int64_map[name].resize(1); predict_res._int64_map[name].resize(1);
predict_res._int64_map[name][0].resize(len); predict_res._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
...@@ -226,6 +227,7 @@ int PredictorClient::predict( ...@@ -226,6 +227,7 @@ int PredictorClient::predict(
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = res.insts(0).tensor_array(idx).float_data_size(); int len = res.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len;
predict_res._float_map[name].resize(1); predict_res._float_map[name].resize(1);
predict_res._float_map[name][0].resize(len); predict_res._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
...@@ -240,11 +242,12 @@ int PredictorClient::predict( ...@@ -240,11 +242,12 @@ int PredictorClient::predict(
if (FLAGS_profile_client) { if (FLAGS_profile_client) {
std::ostringstream oss; std::ostringstream oss;
oss << "PROFILE\t" oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " " << "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
...@@ -252,10 +255,10 @@ int PredictorClient::predict( ...@@ -252,10 +255,10 @@ int PredictorClient::predict(
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " "; oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
} }
} }
oss << "postpro_0:" << postprocess_start << " "; oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end; oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str()); fprintf(stderr, "%s\n", oss.str().c_str());
} }
return 0; return 0;
...@@ -342,7 +345,7 @@ std::vector<std::vector<std::vector<float>>> PredictorClient::batch_predict( ...@@ -342,7 +345,7 @@ std::vector<std::vector<std::vector<float>>> PredictorClient::batch_predict(
} }
VLOG(2) << "batch [" << bi << "] " VLOG(2) << "batch [" << bi << "] "
<< "itn feed value prepared"; << "int feed value prepared";
} }
int64_t preprocess_end = timeline.TimeStampUS(); int64_t preprocess_end = timeline.TimeStampUS();
......
...@@ -31,13 +31,15 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -31,13 +31,15 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol()) py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init()) .def(py::init())
.def("get_int64_by_name", .def("get_int64_by_name",
[](PredictorRes &self, std::string & name) { [](PredictorRes &self, std::string &name) {
return self.get_int64_by_name(name); return self.get_int64_by_name(name);
}, py::return_value_policy::reference) },
py::return_value_policy::reference)
.def("get_float_by_name", .def("get_float_by_name",
[](PredictorRes &self, std::string & name) { [](PredictorRes &self, std::string &name) {
return self.get_float_by_name(name); return self.get_float_by_name(name);
}, py::return_value_policy::reference); },
py::return_value_policy::reference);
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol()) py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init()) .def(py::init())
...@@ -56,26 +58,29 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -56,26 +58,29 @@ PYBIND11_MODULE(serving_client, m) {
self.set_predictor_conf(conf_path, conf_file); self.set_predictor_conf(conf_path, conf_file);
}) })
.def("create_predictor_by_desc", .def("create_predictor_by_desc",
[](PredictorClient &self, const std::string & sdk_desc) { [](PredictorClient &self, const std::string &sdk_desc) {
self.create_predictor_by_desc(sdk_desc); }) self.create_predictor_by_desc(sdk_desc);
})
.def("create_predictor", .def("create_predictor",
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); }) [](PredictorClient &self) { self.destroy_predictor(); })
.def("predict", .def("predict",
[](PredictorClient &self, [](PredictorClient &self,
const std::vector<std::vector<float>> &float_feed, const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name, const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed, const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes & predict_res) { PredictorRes &predict_res,
const int &pid) {
return self.predict(float_feed, return self.predict(float_feed,
float_feed_name, float_feed_name,
int_feed, int_feed,
int_feed_name, int_feed_name,
fetch_name, fetch_name,
predict_res); predict_res,
pid);
}) })
.def("batch_predict", .def("batch_predict",
[](PredictorClient &self, [](PredictorClient &self,
......
## How to save a servable model of Paddle Serving?
- Currently, paddle serving provides a save_model interface for users to access, the interface is similar with `save_inference_model` of Paddle.
``` python
import paddle_serving_client.io as serving_io
serving_io.save_model("imdb_model", "imdb_client_conf",
{"words": data}, {"prediction": prediction},
fluid.default_main_program())
```
`imdb_model` is the server side model with serving configurations. `imdb_client_conf` is the client rpc configurations. Serving has a
dictionary for `Feed` and `Fetch` variables for client to assign. An alias name can be defined for each variable. An example of how to use alias name
is as follows:
``` python
from paddle_serving_client import Client
import sys
client = Client()
client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9393"])
for line in sys.stdin:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0]) + 1]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch)
print("{} {}".format(fetch_map["prediction"][1], label[0]))
```
## 语义理解预测服务
示例中采用BERT模型进行语义理解预测,将文本表示为向量的形式,可以用来做进一步的分析和预测。
### 获取模型
示例中采用[Paddlehub](https://github.com/PaddlePaddle/PaddleHub)中的[BERT中文模型](https://www.paddlepaddle.org.cn/hubdetail?name=bert_chinese_L-12_H-768_A-12&en_category=SemanticModel)
执行
```
python prepare_model.py
```
生成server端配置文件与模型文件,存放在serving_server_model文件夹
生成client端配置文件,存放在serving_client_conf文件夹
### 启动预测服务
执行
```
python bert_server.py serving_server_model 9292 #启动cpu预测服务
```
或者
```
python bert_gpu_server.py serving_server_model 9292 0 #在gpu 0上启动gpu预测服务
```
### 执行预测
执行
```
sh get_data.sh
```
获取中文样例数据
执行
```
head data-c.txt | python bert_client.py
```
将预测样例数据中的前十条样例,并将向量表示打印到标准输出。
### Benchmark
模型:bert_chinese_L-12_H-768_A-12
设备:GPU V100 * 1
环境:CUDA 9.2,cudnn 7.1.4
测试中将样例数据中的1W个样本复制为10W个样本,每个client线程发送线程数分之一个样本,batch size为1,max_seq_len为20,时间单位为秒.
在client线程数为4时,预测速度可以达到432样本每秒。
由于单张GPU内部只能串行计算,client线程增多只能减少GPU的空闲时间,因此在线程数达到4之后,线程数增多对预测速度没有提升。
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ----- | ------ | ---- | ------- | ------ |
| 1 | 3.05 | 290.54 | 0.37 | 239.15 | 6.43 | 0.71 | 365.63 |
| 4 | 0.85 | 213.66 | 0.091 | 200.39 | 1.62 | 0.2 | 231.45 |
| 8 | 0.42 | 223.12 | 0.043 | 110.99 | 0.8 | 0.098 | 232.05 |
| 12 | 0.32 | 225.26 | 0.029 | 73.87 | 0.53 | 0.078 | 231.45 |
| 16 | 0.23 | 227.26 | 0.022 | 55.61 | 0.4 | 0.056 | 231.9 |
总耗时变化规律如下:
![bert benchmark](../../../doc/bert-benchmark-batch-size-1.png)
...@@ -17,7 +17,7 @@ from paddle_serving_client import Client ...@@ -17,7 +17,7 @@ from paddle_serving_client import Client
from paddle_serving_client.metric import auc from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import time import time
from test_bert_client import BertService from bert_client import BertService
def predict(thr_id, resource): def predict(thr_id, resource):
...@@ -55,7 +55,7 @@ if __name__ == '__main__': ...@@ -55,7 +55,7 @@ if __name__ == '__main__':
thread_num = sys.argv[3] thread_num = sys.argv[3]
resource = {} resource = {}
resource["conf_file"] = conf_file resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9293"] resource["server_endpoint"] = ["127.0.0.1:9292"]
resource["filelist"] = [data_file] resource["filelist"] = [data_file]
resource["thread_num"] = int(thread_num) resource["thread_num"] = int(thread_num)
......
...@@ -17,7 +17,7 @@ from paddle_serving_client import Client ...@@ -17,7 +17,7 @@ from paddle_serving_client import Client
from paddle_serving_client.metric import auc from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import time import time
from test_bert_client import BertService from bert_client import BertService
def predict(thr_id, resource, batch_size): def predict(thr_id, resource, batch_size):
......
# coding:utf-8 # coding:utf-8
import os
import sys import sys
import numpy as np import numpy as np
import paddlehub as hub import paddlehub as hub
import ujson import ujson
import random import random
import time
from paddlehub.common.logger import logger from paddlehub.common.logger import logger
import socket import socket
from paddle_serving_client import Client from paddle_serving_client import Client
...@@ -20,29 +22,23 @@ if is_py3: ...@@ -20,29 +22,23 @@ if is_py3:
class BertService(): class BertService():
def __init__(self, def __init__(self,
profile=False,
max_seq_len=128, max_seq_len=128,
model_name="bert_uncased_L-12_H-768_A-12", model_name="bert_uncased_L-12_H-768_A-12",
show_ids=False, show_ids=False,
do_lower_case=True, do_lower_case=True,
process_id=0, process_id=0,
retry=3, retry=3):
load_balance='round_robin'):
self.process_id = process_id self.process_id = process_id
self.reader_flag = False self.reader_flag = False
self.batch_size = 0 self.batch_size = 0
self.max_seq_len = max_seq_len self.max_seq_len = max_seq_len
self.profile = profile
self.model_name = model_name self.model_name = model_name
self.show_ids = show_ids self.show_ids = show_ids
self.do_lower_case = do_lower_case self.do_lower_case = do_lower_case
self.con_list = []
self.con_index = 0
self.load_balance = load_balance
self.server_list = []
self.serving_list = []
self.feed_var_names = ''
self.retry = retry self.retry = retry
self.pid = os.getpid()
self.profile = True if ("FLAGS_profile_client" in os.environ and
os.environ["FLAGS_profile_client"]) else False
module = hub.Module(name=self.model_name) module = hub.Module(name=self.model_name)
inputs, outputs, program = module.context( inputs, outputs, program = module.context(
...@@ -51,7 +47,6 @@ class BertService(): ...@@ -51,7 +47,6 @@ class BertService():
position_ids = inputs["position_ids"] position_ids = inputs["position_ids"]
segment_ids = inputs["segment_ids"] segment_ids = inputs["segment_ids"]
input_mask = inputs["input_mask"] input_mask = inputs["input_mask"]
self.feed_var_names = input_ids.name + ';' + position_ids.name + ';' + segment_ids.name + ';' + input_mask.name
self.reader = hub.reader.ClassifyReader( self.reader = hub.reader.ClassifyReader(
vocab_path=module.get_vocab_path(), vocab_path=module.get_vocab_path(),
dataset=None, dataset=None,
...@@ -69,6 +64,7 @@ class BertService(): ...@@ -69,6 +64,7 @@ class BertService():
data_generator = self.reader.data_generator( data_generator = self.reader.data_generator(
batch_size=self.batch_size, phase='predict', data=text) batch_size=self.batch_size, phase='predict', data=text)
result = [] result = []
prepro_start = time.time()
for run_step, batch in enumerate(data_generator(), start=1): for run_step, batch in enumerate(data_generator(), start=1):
token_list = batch[0][0].reshape(-1).tolist() token_list = batch[0][0].reshape(-1).tolist()
pos_list = batch[0][1].reshape(-1).tolist() pos_list = batch[0][1].reshape(-1).tolist()
...@@ -81,6 +77,12 @@ class BertService(): ...@@ -81,6 +77,12 @@ class BertService():
"segment_ids": sent_list, "segment_ids": sent_list,
"input_mask": mask_list "input_mask": mask_list
} }
prepro_end = time.time()
if self.profile:
print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000))))
fetch_map = self.client.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch)
return fetch_map return fetch_map
...@@ -90,6 +92,7 @@ class BertService(): ...@@ -90,6 +92,7 @@ class BertService():
data_generator = self.reader.data_generator( data_generator = self.reader.data_generator(
batch_size=self.batch_size, phase='predict', data=text) batch_size=self.batch_size, phase='predict', data=text)
result = [] result = []
prepro_start = time.time()
for run_step, batch in enumerate(data_generator(), start=1): for run_step, batch in enumerate(data_generator(), start=1):
token_list = batch[0][0].reshape(-1).tolist() token_list = batch[0][0].reshape(-1).tolist()
pos_list = batch[0][1].reshape(-1).tolist() pos_list = batch[0][1].reshape(-1).tolist()
...@@ -108,33 +111,46 @@ class BertService(): ...@@ -108,33 +111,46 @@ class BertService():
mask_list[si * self.max_seq_len:(si + 1) * self.max_seq_len] mask_list[si * self.max_seq_len:(si + 1) * self.max_seq_len]
} }
feed_batch.append(feed) feed_batch.append(feed)
prepro_end = time.time()
if self.profile:
print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000))))
fetch_map_batch = self.client.batch_predict( fetch_map_batch = self.client.batch_predict(
feed_batch=feed_batch, fetch=fetch) feed_batch=feed_batch, fetch=fetch)
return fetch_map_batch return fetch_map_batch
def test(): def test():
bc = BertService( bc = BertService(
model_name='bert_uncased_L-12_H-768_A-12', model_name='bert_chinese_L-12_H-768_A-12',
max_seq_len=20, max_seq_len=20,
show_ids=False, show_ids=False,
do_lower_case=True) do_lower_case=True)
server_addr = ["127.0.0.1:9293"] server_addr = ["127.0.0.1:9292"]
config_file = './serving_client_conf/serving_client_conf.prototxt' config_file = './serving_client_conf/serving_client_conf.prototxt'
fetch = ["pooled_output"] fetch = ["pooled_output"]
bc.load_client(config_file, server_addr) bc.load_client(config_file, server_addr)
batch_size = 4 batch_size = 1
batch = [] batch = []
for line in sys.stdin: for line in sys.stdin:
if len(batch) < batch_size: if batch_size == 1:
batch.append([line.strip()]) result = bc.run_general([[line.strip()]], fetch)
print(result)
else: else:
result = bc.run_batch_general(batch, fetch) if len(batch) < batch_size:
batch = [] batch.append([line.strip()])
for r in result: else:
for e in r["pooled_output"]: result = bc.run_batch_general(batch, fetch)
print(e) batch = []
for r in result:
print(r)
if len(batch) > 0:
result = bc.run_batch_general(batch, fetch)
batch = []
for r in result:
print(r)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -36,5 +36,7 @@ server.set_gpuid(1) ...@@ -36,5 +36,7 @@ server.set_gpuid(1)
server.load_model_config(sys.argv[1]) server.load_model_config(sys.argv[1])
port = int(sys.argv[2]) port = int(sys.argv[2])
gpuid = sys.argv[3]
server.set_gpuid(gpuid)
server.prepare_server(workdir="work_dir1", port=port, device="gpu") server.prepare_server(workdir="work_dir1", port=port, device="gpu")
server.run_server() server.run_server()
wget https://paddle-serving.bj.bcebos.com/bert_example/data-c.txt --no-check-certificate
...@@ -11,30 +11,38 @@ ...@@ -11,30 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_client import Client
import os from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args
import time
import paddle
import sys import sys
from paddle_serving_server import OpMaker import requests
from paddle_serving_server import OpSeqMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker() args = benchmark_args()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server() def single_func(idx, resource):
server.set_op_sequence(op_seq_maker.get_op_sequence()) if args.request == "rpc":
server.set_num_threads(4) client = Client()
server.set_local_bin( client.load_client_config(args.model)
"~/github/Serving/build_server/core/general-server/serving") client.connect([args.endpoint])
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"])
end = time.time()
return [[end - start]]
elif args.request == "http":
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
r = requests.post('http://{}/uci/prediction'.format(args.endpoint),
data = {"x": data[0]})
end = time.time()
return [[end - start]]
server.load_model_config(sys.argv[1]) multi_thread_runner = MultiThreadRunner()
port = int(sys.argv[2]) result = multi_thread_runner.run(single_func, args.thread, {})
server.prepare_server(workdir="work_dir1", port=port, device="cpu") print(result)
server.run_server()
...@@ -19,48 +19,12 @@ cat test.data | python test_client_batch.py inference.conf 4 > result ...@@ -19,48 +19,12 @@ cat test.data | python test_client_batch.py inference.conf 4 > result
设备 :Intel(R) Xeon(R) Gold 6271 CPU @ 2.60GHz * 48 设备 :Intel(R) Xeon(R) Gold 6271 CPU @ 2.60GHz * 48
模型 :IMDB-CNN 模型 :[CNN](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imdb/nets.py)
测试中,client共发送2500条测试样本,图中数据为单个线程的耗时,时间单位为秒
server thread num :4
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- |
| 1 | 0.99 | 27.39 | 0.085 | 19.92 | 0.046 | 0.032 | 29.84 |
| 4 | 0.22 | 7.66 | 0.021 | 4.93 | 0.011 | 0.0082 | 8.28 |
| 8 | 0.1 | 6.66 | 0.01 | 2.42 | 0.0038 | 0.0046 | 6.95 |
| 12 | 0.074 | 6.87 | 0.0069 | 1.61 | 0.0059 | 0.0032 | 7.07 |
| 16 | 0.056 | 7.01 | 0.0053 | 1.23 | 0.0029 | 0.0026 | 7.17 |
| 20 | 0.045 | 7.02 | 0.0042 | 0.97 | 0.0023 | 0.002 | 7.15 |
| 24 | 0.039 | 7.012 | 0.0034 | 0.8 | 0.0019 | 0.0016 | 7.12 |
server thread num : 8
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- |
| 1 | 1.02 | 28.9 | 0.096 | 20.64 | 0.047 | 0.036 | 31.51 |
| 4 | 0.22 | 7.83 | 0.021 | 5.08 | 0.012 | 0.01 | 8.45 |
| 8 | 0.11 | 4.44 | 0.01 | 2.5 | 0.0059 | 0.0051 | 4.73 |
| 12 | 0.074 | 4.11 | 0.0069 | 1.65 | 0.0039 | 0.0029 | 4.31 |
| 16 | 0.057 | 4.2 | 0.0052 | 1.24 | 0.0029 | 0.0024 | 4.35 |
| 20 | 0.046 | 4.05 | 0.0043 | 1.01 | 0.0024 | 0.0021 | 4.18 |
| 24 | 0.038 | 4.02 | 0.0034 | 0.81 | 0.0019 | 0.0015 | 4.13 |
server thread num : 12
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- |
| 1 | 1.02 | 29.47 | 0.098 | 20.95 | 0.048 | 0.038 | 31.96 |
| 4 | 0.21 | 7.36 | 0.022 | 5.01 | 0.011 | 0.0081 | 7.95 |
| 8 | 0.11 | 4.52 | 0.011 | 2.58 | 0.0061 | 0.0051 | 4.83 |
| 12 | 0.072 | 3.25 | 0.0076 | 1.72 | 0.0042 | 0.0038 | 3.45 |
| 16 | 0.059 | 3.93 | 0.0055 | 1.26 | 0.0029 | 0.0023 | 4.1 |
| 20 | 0.047 | 3.79 | 0.0044 | 1.01 | 0.0024 | 0.0021 | 3.92 |
| 24 | 0.041 | 3.76 | 0.0036 | 0.83 | 0.0019 | 0.0017 | 3.87 |
server thread num : 16 server thread num : 16
测试中,client共发送25000条测试样本,图中数据为单个线程的耗时,时间单位为秒。可以看出,client端多线程的预测速度相比单线程有明显提升,在16线程时预测速度是单线程的8.7倍。
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total | | client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- | | ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- |
| 1 | 1.09 | 28.79 | 0.094 | 20.59 | 0.047 | 0.034 | 31.41 | | 1 | 1.09 | 28.79 | 0.094 | 20.59 | 0.047 | 0.034 | 31.41 |
...@@ -71,26 +35,6 @@ server thread num : 16 ...@@ -71,26 +35,6 @@ server thread num : 16
| 20 | 0.049 | 3.77 | 0.0047 | 1.03 | 0.0025 | 0.0022 | 3.91 | | 20 | 0.049 | 3.77 | 0.0047 | 1.03 | 0.0025 | 0.0022 | 3.91 |
| 24 | 0.041 | 3.86 | 0.0039 | 0.85 | 0.002 | 0.0017 | 3.98 | | 24 | 0.041 | 3.86 | 0.0039 | 0.85 | 0.002 | 0.0017 | 3.98 |
server thread num : 20 预测总耗时变化规律如下:
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total |
| ------------------ | ------ | ------------ | ------ | ----- | ------ | ------- | ----- |
| 1 | 1.03 | 28.42 | 0.085 | 20.47 | 0.046 | 0.037 | 30.98 |
| 4 | 0.22 | 7.94 | 0.022 | 5.33 | 0.012 | 0.011 | 8.53 |
| 8 | 0.11 | 4.54 | 0.01 | 2.58 | 0.006 | 0.0046 | 4.84 |
| 12 | 0.079 | 4.54 | 0.0076 | 1.78 | 0.0042 | 0.0039 | 4.76 |
| 16 | 0.059 | 3.41 | 0.0057 | 1.33 | 0.0032 | 0.0027 | 3.58 |
| 20 | 0.051 | 4.33 | 0.0047 | 1.06 | 0.0025 | 0.0023 | 4.48 |
| 24 | 0.043 | 4.51 | 0.004 | 0.88 | 0.0021 | 0.0018 | 4.63 |
server thread num :24
| client thread num | prepro | client infer | op0 | op1 | op2 | postpro | total | ![total cost](../../../doc/imdb-benchmark-server-16.png)
| ------------------ | ------ | ------------ | ------ | ---- | ------ | ------- | ----- |
| 1 | 0.93 | 29.28 | 0.099 | 20.5 | 0.048 | 0.028 | 31.61 |
| 4 | 0.22 | 7.72 | 0.023 | 4.98 | 0.011 | 0.0095 | 8.33 |
| 8 | 0.11 | 4.77 | 0.012 | 2.65 | 0.0062 | 0.0049 | 5.09 |
| 12 | 0.081 | 4.22 | 0.0078 | 1.77 | 0.0042 | 0.0033 | 4.44 |
| 16 | 0.062 | 4.21 | 0.0061 | 1.34 | 0.0032 | 0.0026 | 4.39 |
| 20 | 0.5 | 3.58 | 0.005 | 1.07 | 0.0026 | 0.0023 | 3.72 |
| 24 | 0.043 | 4.27 | 0.0042 | 0.89 | 0.0022 | 0.0018 | 4.4 |
...@@ -13,55 +13,45 @@ ...@@ -13,55 +13,45 @@
# limitations under the License. # limitations under the License.
import sys import sys
import time
import requests
from imdb_reader import IMDBDataset
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import time from paddle_serving_client.utils import benchmark_args
args = benchmark_args()
def predict(thr_id, resource): def single_func(idx, resource):
client = Client() imdb_dataset = IMDBDataset()
client.load_client_config(resource["conf_file"]) imdb_dataset.load_resource(args.vocab)
client.connect(resource["server_endpoint"]) filelist_fn = args.filelist
thread_num = resource["thread_num"] filelist = []
file_list = resource["filelist"]
line_id = 0
prob = []
label_list = []
dataset = []
for fn in file_list:
fin = open(fn)
for line in fin:
if line_id % thread_num == thr_id - 1:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
dataset.append(feed)
line_id += 1
fin.close()
start = time.time() start = time.time()
fetch = ["acc", "cost", "prediction"] with open(filelist_fn) as fin:
for inst in dataset: for line in fin:
fetch_map = client.predict(feed=inst, fetch=fetch) filelist.append(line.strip())
prob.append(fetch_map["prediction"][1]) filelist = filelist[idx::args.thread]
label_list.append(label[0]) if args.request == "rpc":
client = Client()
client.load_client_config(args.model)
client.connect([args.endpoint])
for fn in filelist:
fin = open(fn)
for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line)
fetch_map = client.predict(feed={"words": word_ids},
fetch=["prediction"])
elif args.request == "http":
for fn in filelist:
fin = open(fn)
for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line)
r = requests.post("http://{}/imdb/prediction".format(args.endpoint),
data={"words": word_ids})
end = time.time() end = time.time()
client.release() return [[end - start]]
return [prob, label_list, [end - start]]
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
resource = {}
resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9293"]
resource["filelist"] = [data_file]
resource["thread_num"] = int(sys.argv[3])
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
print("total time {} s".format(sum(result[-1]) / len(result[-1]))) multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(single_func, args.thread, {})
print(result)
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz tar -zxvf text_classification_data.tar.gz
#wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb.tar.gz tar -zxvf imdb_model.tar.gz
#tar -xzf imdb-demo%2Fimdb.tar.gz
...@@ -30,6 +30,14 @@ class IMDBDataset(dg.MultiSlotDataGenerator): ...@@ -30,6 +30,14 @@ class IMDBDataset(dg.MultiSlotDataGenerator):
self._pattern = re.compile(r'(;|,|\.|\?|!|\s|\(|\))') self._pattern = re.compile(r'(;|,|\.|\?|!|\s|\(|\))')
self.return_value = ("words", [1, 2, 3, 4, 5, 6]), ("label", [0]) self.return_value = ("words", [1, 2, 3, 4, 5, 6]), ("label", [0])
def get_words_only(self, line):
sent = line.lower().replace("<br />", " ").strip()
words = [x for x in self._pattern.split(sent) if x and x != " "]
feas = [
self._vocab[x] if x in self._vocab else self._unk_id for x in words
]
return feas
def get_words_and_label(self, line): def get_words_and_label(self, line):
send = '|'.join(line.split('|')[:-1]).lower().replace("<br />", send = '|'.join(line.split('|')[:-1]).lower().replace("<br />",
" ").strip() " ").strip()
......
wget https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb_service.tar.gz wget https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_service.tar.gz
tar -xzf imdb_service.tar.gz tar -xzf imdb_service.tar.gz
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
tar -zxvf text_classification_data.tar.gz tar -zxvf text_classification_data.tar.gz
......
...@@ -49,8 +49,9 @@ if __name__ == "__main__": ...@@ -49,8 +49,9 @@ if __name__ == "__main__":
dataset.set_batch_size(128) dataset.set_batch_size(128)
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_thread(10) dataset.set_thread(10)
from nets import bow_net from nets import lstm_net
avg_cost, acc, prediction = bow_net(data, label, dict_dim) model_name = "imdb_lstm"
avg_cost, acc, prediction = lstm_net(data, label, dict_dim)
optimizer = fluid.optimizer.SGD(learning_rate=0.01) optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
...@@ -65,6 +66,7 @@ if __name__ == "__main__": ...@@ -65,6 +66,7 @@ if __name__ == "__main__":
program=fluid.default_main_program(), dataset=dataset, debug=False) program=fluid.default_main_program(), dataset=dataset, debug=False)
logger.info("TRAIN --> pass: {}".format(i)) logger.info("TRAIN --> pass: {}".format(i))
if i == 5: if i == 5:
serving_io.save_model("imdb_model", "imdb_client_conf", serving_io.save_model("{}_model".format(model_name),
"{}_client_conf".format(model_name),
{"words": data}, {"prediction": prediction}, {"words": data}, {"prediction": prediction},
fluid.default_main_program()) fluid.default_main_program())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client from paddle_serving_client import Client
from imdb_reader import IMDBDataset
import sys import sys
client = Client() client = Client()
client.load_client_config(sys.argv[1]) client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource(sys.argv[2])
for line in sys.stdin: for line in sys.stdin:
group = line.strip().split() word_ids, label = imdb_dataset.get_words_and_label(line)
words = [int(x) for x in group[1:int(group[0]) + 1]] feed = {"words": word_ids, "label": label}
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"] fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch) fetch_map = client.predict(feed=feed, fetch=fetch)
print("{} {}".format(fetch_map["prediction"][1], label[0])) print("{} {}".format(fetch_map["prediction"][1], label[0]))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
import sys
import subprocess
from multiprocessing import Pool
import time
def predict(p_id, p_size, data_list):
client = Client()
client.load_client_config(conf_file)
client.connect(["127.0.0.1:8010"])
result = []
for line in data_list:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch)
#print("{} {}".format(fetch_map["prediction"][1], label[0]))
result.append([fetch_map["prediction"][1], label[0]])
return result
def predict_multi_thread(p_num):
data_list = []
with open(data_file) as f:
for line in f.readlines():
data_list.append(line)
start = time.time()
p = Pool(p_num)
p_size = len(data_list) / p_num
result_list = []
for i in range(p_num):
result_list.append(
p.apply_async(predict,
[i, p_size, data_list[i * p_size:(i + 1) * p_size]]))
p.close()
p.join()
for i in range(p_num):
result = result_list[i].get()
for j in result:
print("{} {}".format(j[0], j[1]))
cost = time.time() - start
print("{} threads cost {}".format(p_num, cost))
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
p_num = int(sys.argv[3])
predict_multi_thread(p_num)
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
#!flask/bin/python
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
from imdb_reader import IMDBDataset from imdb_reader import IMDBDataset
import sys import sys
...@@ -27,7 +26,7 @@ class IMDBService(WebService): ...@@ -27,7 +26,7 @@ class IMDBService(WebService):
if "words" not in feed: if "words" not in feed:
exit(-1) exit(-1)
res_feed = {} res_feed = {}
res_feed["words"] = self.dataset.get_words_and_label(feed["words"])[0] res_feed["words"] = self.dataset.get_words_only(feed["words"])[0]
return res_feed, fetch return res_feed, fetch
imdb_service = IMDBService(name="imdb") imdb_service = IMDBService(name="imdb")
......
## Timeline工具使用
serving框架中内置了预测服务中各阶段时间打点的功能,通过环境变量来控制是否开启。
```
export FLAGS_profile_client=1 #开启client端各阶段时间打点
export FLAGS_profile_server=1 #开启server端各阶段时间打点
```
开启该功能后,client端在预测的过程中会将对应的日志信息打印到标准输出。
为了更直观地展现各阶段的耗时,提供脚本对日志文件做进一步的分析处理。
使用时先将client的输出保存到文件,以profile为例。
```
python show_profile.py profile ${thread_num}
```
脚本将计算各阶段的耗时,并除以线程数做平均,打印到标准输出。
```
python timeline_trace.py profile trace
```
脚本将日志中的时间打点信息转换成json格式保存到trace文件,trace文件可以通过chrome浏览器的tracing功能进行可视化。
具体操作:打开chrome浏览器,在地址栏输入chrome://tracing/,跳转至tracing页面,点击load按钮,打开保存的trace文件,即可将预测服务的各阶段时间信息可视化。
...@@ -5,8 +5,9 @@ import sys ...@@ -5,8 +5,9 @@ import sys
profile_file = sys.argv[1] profile_file = sys.argv[1]
def prase(line, counter): def prase(pid_str, time_str, counter):
event_list = line.split(" ") pid = pid_str.split(":")[1]
event_list = time_str.split(" ")
trace_list = [] trace_list = []
for event in event_list: for event in event_list:
name, ts = event.split(":") name, ts = event.split(":")
...@@ -19,7 +20,7 @@ def prase(line, counter): ...@@ -19,7 +20,7 @@ def prase(line, counter):
event_dict = {} event_dict = {}
event_dict["name"] = name event_dict["name"] = name
event_dict["tid"] = 0 event_dict["tid"] = 0
event_dict["pid"] = 0 event_dict["pid"] = pid
event_dict["ts"] = ts event_dict["ts"] = ts
event_dict["ph"] = ph event_dict["ph"] = ph
...@@ -36,7 +37,7 @@ if __name__ == "__main__": ...@@ -36,7 +37,7 @@ if __name__ == "__main__":
for line in f.readlines(): for line in f.readlines():
line = line.strip().split("\t") line = line.strip().split("\t")
if line[0] == "PROFILE": if line[0] == "PROFILE":
trace_list = prase(line[1], counter) trace_list = prase(line[1], line[2], counter)
counter += 1 counter += 1
for trace in trace_list: for trace in trace_list:
all_list.append(trace) all_list.append(trace)
......
...@@ -74,10 +74,11 @@ class Client(object): ...@@ -74,10 +74,11 @@ class Client(object):
self.fetch_names_ = [] self.fetch_names_ = []
self.client_handle_ = None self.client_handle_ = None
self.result_handle_ = None self.result_handle_ = None
self.feed_shapes_ = [] self.feed_shapes_ = {}
self.feed_types_ = {} self.feed_types_ = {}
self.feed_names_to_idx_ = {} self.feed_names_to_idx_ = {}
self.rpath() self.rpath()
self.pid = os.getpid()
def rpath(self): def rpath(self):
lib_path = os.path.dirname(paddle_serving_client.__file__) lib_path = os.path.dirname(paddle_serving_client.__file__)
...@@ -85,7 +86,6 @@ class Client(object): ...@@ -85,7 +86,6 @@ class Client(object):
lib_path = os.path.join(lib_path, 'lib') lib_path = os.path.join(lib_path, 'lib')
os.popen('patchelf --set-rpath {} {}'.format(lib_path, client_path)) os.popen('patchelf --set-rpath {} {}'.format(lib_path, client_path))
def load_client_config(self, path): def load_client_config(self, path):
from .serving_client import PredictorClient from .serving_client import PredictorClient
from .serving_client import PredictorRes from .serving_client import PredictorRes
...@@ -106,13 +106,23 @@ class Client(object): ...@@ -106,13 +106,23 @@ class Client(object):
0]] + ["--tryfromenv=" + ",".join(read_env_flags)]) 0]] + ["--tryfromenv=" + ",".join(read_env_flags)])
self.feed_names_ = [var.alias_name for var in model_conf.feed_var] self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var] self.fetch_names_ = [var.alias_name for var in model_conf.fetch_var]
self.feed_shapes_ = [var.shape for var in model_conf.feed_var]
self.feed_names_to_idx_ = {} self.feed_names_to_idx_ = {}
self.fetch_names_to_type_ = {} self.fetch_names_to_type_ = {}
self.fetch_names_to_idx_ = {} self.fetch_names_to_idx_ = {}
self.lod_tensor_set = set()
self.feed_tensor_len = {}
for i, var in enumerate(model_conf.feed_var): for i, var in enumerate(model_conf.feed_var):
self.feed_names_to_idx_[var.alias_name] = i self.feed_names_to_idx_[var.alias_name] = i
self.feed_types_[var.alias_name] = var.feed_type self.feed_types_[var.alias_name] = var.feed_type
self.feed_shapes_[var.alias_name] = var.shape
if var.is_lod_tensor:
self.lod_tensor_set.add(var.alias_name)
else:
counter = 1
for dim in self.feed_shapes_[var.alias_name]:
counter *= dim
self.feed_tensor_len[var.alias_name] = counter
for i, var in enumerate(model_conf.fetch_var): for i, var in enumerate(model_conf.fetch_var):
self.fetch_names_to_idx_[var.alias_name] = i self.fetch_names_to_idx_[var.alias_name] = i
...@@ -128,9 +138,8 @@ class Client(object): ...@@ -128,9 +138,8 @@ class Client(object):
predictor_sdk.set_server_endpoints(endpoints) predictor_sdk.set_server_endpoints(endpoints)
sdk_desc = predictor_sdk.gen_desc() sdk_desc = predictor_sdk.gen_desc()
print(sdk_desc) print(sdk_desc)
self.client_handle_.create_predictor_by_desc( self.client_handle_.create_predictor_by_desc(sdk_desc.SerializeToString(
sdk_desc.SerializeToString()) ))
def get_feed_names(self): def get_feed_names(self):
return self.feed_names_ return self.feed_names_
...@@ -138,13 +147,23 @@ class Client(object): ...@@ -138,13 +147,23 @@ class Client(object):
def get_fetch_names(self): def get_fetch_names(self):
return self.fetch_names_ return self.fetch_names_
def shape_check(self, feed, key):
seq_shape = 1
if key in self.lod_tensor_set:
return
if len(feed[key]) != self.feed_tensor_len[key]:
raise SystemExit("The shape of feed tensor {} not match.".format(
key))
def predict(self, feed={}, fetch=[]): def predict(self, feed={}, fetch=[]):
int_slot = [] int_slot = []
float_slot = [] float_slot = []
int_feed_names = [] int_feed_names = []
float_feed_names = [] float_feed_names = []
fetch_names = [] fetch_names = []
for key in feed: for key in feed:
self.shape_check(feed, key)
if key not in self.feed_names_: if key not in self.feed_names_:
continue continue
if self.feed_types_[key] == int_type: if self.feed_types_[key] == int_type:
...@@ -158,16 +177,18 @@ class Client(object): ...@@ -158,16 +177,18 @@ class Client(object):
if key in self.fetch_names_: if key in self.fetch_names_:
fetch_names.append(key) fetch_names.append(key)
ret = self.client_handle_.predict( ret = self.client_handle_.predict(float_slot, float_feed_names,
float_slot, float_feed_names, int_slot, int_slot, int_feed_names, fetch_names,
int_feed_names, fetch_names, self.result_handle_) self.result_handle_, self.pid)
result_map = {} result_map = {}
for i, name in enumerate(fetch_names): for i, name in enumerate(fetch_names):
if self.fetch_names_to_type_[name] == int_type: if self.fetch_names_to_type_[name] == int_type:
result_map[name] = self.result_handle_.get_int64_by_name(name)[0] result_map[name] = self.result_handle_.get_int64_by_name(name)[
0]
elif self.fetch_names_to_type_[name] == float_type: elif self.fetch_names_to_type_[name] == float_type:
result_map[name] = self.result_handle_.get_float_by_name(name)[0] result_map[name] = self.result_handle_.get_float_by_name(name)[
0]
return result_map return result_map
......
...@@ -11,16 +11,28 @@ ...@@ -11,16 +11,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import sys import sys
import subprocess import subprocess
import argparse
from multiprocessing import Pool from multiprocessing import Pool
def benchmark_args():
parser = argparse.ArgumentParser("benchmark")
parser.add_argument("--thread", type=int, default=10, help="concurrecy")
parser.add_argument("--model", type=str, default="", help="model for evaluation")
parser.add_argument("--endpoint", type=str, default="127.0.0.1:9292", help="endpoint of server")
parser.add_argument("--request", type=str, default="rpc", help="mode of service")
return parser.parse_args()
class MultiThreadRunner(object): class MultiThreadRunner(object):
def __init__(self): def __init__(self):
pass pass
def run(self, thread_func, thread_num, global_resource): def run(self, thread_func, thread_num, global_resource):
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
p = Pool(thread_num) p = Pool(thread_num)
result_list = [] result_list = []
for i in range(thread_num): for i in range(thread_num):
......
...@@ -12,6 +12,6 @@ ...@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" Paddle Serving Client version string """ """ Paddle Serving Client version string """
serving_client_version = "0.1.1" serving_client_version = "0.1.2"
serving_server_version = "0.1.0" serving_server_version = "0.1.2"
module_proto_version = "0.1.0" module_proto_version = "0.1.2"
...@@ -12,6 +12,6 @@ ...@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" Paddle Serving Client version string """ """ Paddle Serving Client version string """
serving_client_version = "0.1.0" serving_client_version = "0.1.2"
serving_server_version = "0.1.0" serving_server_version = "0.1.2"
module_proto_version = "0.1.0" module_proto_version = "0.1.2"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage:
Host a trained paddle model with one line command
Example:
python -m paddle_serving_server.serve --model ./serving_server_model --port 9292
"""
import argparse
def parse_args():
parser = argparse.ArgumentParser("serve")
parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port the server")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="gpu", help="Type of device")
parser.add_argument("--gpuid", type=int, default=0, help="Index of GPU")
return parser.parse_args()
def start_standard_model():
args = parse_args()
thread_num = args.thread
model = args.model
port = args.port
workdir = args.workdir
device = args.device
gpuid = args.gpuid
if model == "":
print("You must specify your serving model")
exit(-1)
import paddle_serving_server_gpu as serving
op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = serving.OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = serving.Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.load_model_config(model)
server.prepare_server(workdir=workdir, port=port, device=device)
server.set_gpuid(gpuid)
server.run_server()
if __name__ == "__main__":
start_standard_model()
...@@ -12,6 +12,6 @@ ...@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" Paddle Serving Client version string """ """ Paddle Serving Client version string """
serving_client_version = "0.1.0" serving_client_version = "0.1.2"
serving_server_version = "0.1.0" serving_server_version = "0.1.2"
module_proto_version = "0.1.0" module_proto_version = "0.1.2"
...@@ -11,25 +11,41 @@ ...@@ -11,25 +11,41 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Usage:
Host a trained paddle model with one line command
Example:
python -m paddle_serving_server.web_serve --model ./serving_server_model --port 9292
"""
import argparse
from multiprocessing import Pool, Process
from .web_service import WebService
import os
import sys
from paddle_serving_server_gpu import OpMaker
from paddle_serving_server_gpu import OpSeqMaker
from paddle_serving_server_gpu import Server
op_maker = OpMaker() def parse_args():
read_op = op_maker.create('general_reader') parser = argparse.ArgumentParser("web_serve")
general_infer_op = op_maker.create('general_infer') parser.add_argument(
"--thread", type=int, default=10, help="Concurrency of server")
parser.add_argument(
"--model", type=str, default="", help="Model for serving")
parser.add_argument(
"--port", type=int, default=9292, help="Port the server")
parser.add_argument(
"--workdir",
type=str,
default="workdir",
help="Working dir of current service")
parser.add_argument(
"--device", type=str, default="cpu", help="Type of device")
parser.add_argument(
"--name", type=str, default="default", help="Default service name")
return parser.parse_args()
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
server = Server() if __name__ == "__main__":
server.set_op_sequence(op_seq_maker.get_op_sequence()) args = parse_args()
server.set_num_threads(12) service = WebService(name=args.name)
server.load_model_config(sys.argv[1]) service.load_model_config(args.model)
port = int(sys.argv[2]) service.prepare_server(
server.prepare_server(workdir="work_dir1", port=port, device="gpu") workdir=args.workdir, port=args.port, device=args.device)
server.run_server() service.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
from flask import Flask, request, abort
from multiprocessing import Pool, Process
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
from paddle_serving_client import Client
class WebService(object):
def __init__(self, name="default_service"):
self.name = name
def load_model_config(self, model_config):
self.model_config = model_config
def _launch_rpc_service(self):
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(16)
server.set_gpuid = self.gpuid
server.load_model_config(self.model_config)
server.prepare_server(
workdir=self.workdir, port=self.port + 1, device=self.device)
server.run_server()
def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
self.workdir = workdir
self.port = port
self.device = device
self.gpuid = gpuid
def _launch_web_service(self):
app_instance = Flask(__name__)
client_service = Client()
client_service.load_client_config(
"{}/serving_server_conf.prototxt".format(self.model_config))
client_service.connect(["127.0.0.1:{}".format(self.port + 1)])
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=['POST'])
def get_prediction():
if not request.json:
abort(400)
if "fetch" not in request.json:
abort(400)
feed, fetch = self.preprocess(request.json, request.json["fetch"])
fetch_map = client_service.predict(feed=feed, fetch=fetch)
fetch_map = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map)
return fetch_map
app_instance.run(host="127.0.0.1",
port=self.port,
threaded=False,
processes=1)
def run_server(self):
import socket
localIP = socket.gethostbyname(socket.gethostname())
print("web service address:")
print("http://{}:{}/{}/prediction".format(localIP, self.port,
self.name))
p_rpc = Process(target=self._launch_rpc_service)
p_web = Process(target=self._launch_web_service)
p_rpc.start()
p_web.start()
p_web.join()
p_rpc.join()
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map={}):
return fetch_map
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册