未验证 提交 fc35d8dd 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge branch 'develop' into rpc_local_0824

...@@ -6,7 +6,7 @@ include(framework/CMakeLists.txt) ...@@ -6,7 +6,7 @@ include(framework/CMakeLists.txt)
include(tools/CMakeLists.txt) include(tools/CMakeLists.txt)
include(src/CMakeLists.txt) include(src/CMakeLists.txt)
add_definitions(-D__STDC_FORMAT_MACROS)
add_library(pdserving ${pdserving_srcs}) add_library(pdserving ${pdserving_srcs})
set_source_files_properties( set_source_files_properties(
${pdserving_srcs} ${pdserving_srcs}
......
...@@ -61,6 +61,25 @@ pip install -r python/requirements.txt ...@@ -61,6 +61,25 @@ pip install -r python/requirements.txt
If Python3 is used, replace `pip` with `pip3`. If Python3 is used, replace `pip` with `pip3`.
## GOPATH Setting
## Compile Arguments
The default GOPATH is `$HOME/go`, which you can set to other values.
```shell
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
```
## Get go packages
```shell
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
```
## Compile Server ## Compile Server
...@@ -69,7 +88,10 @@ If Python3 is used, replace `pip` with `pip3`. ...@@ -69,7 +88,10 @@ If Python3 is used, replace `pip` with `pip3`.
``` shell ``` shell
mkdir server-build-cpu && cd server-build-cpu mkdir server-build-cpu && cd server-build-cpu
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DSERVER=ON .. cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
-DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \
-DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \
-DSERVER=ON ..
make -j10 make -j10
``` ```
...@@ -79,7 +101,11 @@ you can execute `make install` to put targets under directory `./output`, you ne ...@@ -79,7 +101,11 @@ you can execute `make install` to put targets under directory `./output`, you ne
``` shell ``` shell
mkdir server-build-gpu && cd server-build-gpu mkdir server-build-gpu && cd server-build-gpu
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DSERVER=ON -DWITH_GPU=ON .. cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
-DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \
-DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \
-DSERVER=ON \
-DWITH_GPU=ON ..
make -j10 make -j10
``` ```
...@@ -93,7 +119,10 @@ execute `make install` to put targets under directory `./output` ...@@ -93,7 +119,10 @@ execute `make install` to put targets under directory `./output`
``` shell ``` shell
mkdir client-build && cd client-build mkdir client-build && cd client-build
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DCLIENT=ON .. cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
-DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \
-DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \
-DCLIENT=ON ..
make -j10 make -j10
``` ```
......
...@@ -61,6 +61,22 @@ pip install -r python/requirements.txt ...@@ -61,6 +61,22 @@ pip install -r python/requirements.txt
如果使用 Python3,请以 `pip3` 替换 `pip` 如果使用 Python3,请以 `pip3` 替换 `pip`
## GOPATH 设置
默认 GOPATH 设置为 `$HOME/go`,您也可以设置为其他值。
```shell
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
```
## 获取 Go packages
```shell
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
```
## 编译Server部分 ## 编译Server部分
......
# FAQ # FAQ
- Q如何调整RPC服务的等待时间,避免超时? - Q: 如何调整RPC服务的等待时间,避免超时?
A使用set_rpc_timeout_ms设置更长的等待时间,单位为毫秒,默认时间为20秒。 A: 使用set_rpc_timeout_ms设置更长的等待时间,单位为毫秒,默认时间为20秒。
示例: 示例:
``` ```
...@@ -15,4 +15,13 @@ ...@@ -15,4 +15,13 @@
``` ```
- Q: 如何使用自己编译的Paddle Serving进行预测? - Q: 如何使用自己编译的Paddle Serving进行预测?
A:通过pip命令安装自己编译出的whl包,并设置SERVING_BIN环境变量为编译出的serving二进制文件路径。 A: 通过pip命令安装自己编译出的whl包,并设置SERVING_BIN环境变量为编译出的serving二进制文件路径。
- Q: 执行GPU预测时遇到InvalidArgumentError: Device id must be less than GPU count, but received id is: 0. GPU count is: 0.
A: 将显卡驱动对应的libcuda.so的目录添加到LD_LIBRARY_PATH环境变量中
- Q: 执行GPU预测时遇到ExternalError: Cudnn error, CUDNN_STATUS_BAD_PARAM at (/home/scmbuild/workspaces_cluster.dev/baidu.lib.paddlepaddle/baidu/lib/paddlepaddle/Paddle/paddle/fluid/operators/batch_norm_op.cu:198)
A: 将cudnn的lib64路径添加到LD_LIBRARY_PATH,安装自pypi的Paddle Serving中post9版使用的是cudnn 7.3,post10使用的是cudnn 7.5。如果是使用自己编译的Paddle Serving,可以在log/serving.INFO日志文件中查看对应的cudnn版本。
- Q: 执行GPU预测时遇到Error: Failed to find dynamic library: libcublas.so
A: 将cuda的lib64路径添加到LD_LIBRARY_PATH, post9版本的Paddle Serving使用的是cuda 9.0,post10版本使用的cuda 10.0。
# How to develop a new Web service? # How to develop a new Web service?
([简体中文](NEW_WEB_SERVICE_CN.md)|English) ([简体中文](NEW_WEB_SERVICE_CN.md)|English)
This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](../python/examples/imagenet/resnet50_web_service.py). This document will take Uci service as an example to introduce how to develop a new Web Service. You can check out the complete code [here](../python/examples/pipeline/simple_web_service/web_service.py).
## WebService base class ## Op base class
In some services, a single model may not meet business needs, requiring multiple models to be concatenated or parallel to complete the entire service. We call a single model operation Op and provide a simple set of interfaces to implement the complex logic of Op concatenation or parallelism.
Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows: Data between Ops is passed as a dictionary, Op can be started as threads or process, and Op can be configured for the number of concurrencies, etc.
Typically, you need to inherit the Op base class and override its `init_op`, `preprocess` and `postprocess` methods, which are implemented by default as follows:
```python ```python
class WebService(object): class Op(object):
def init_op(self):
def preprocess(self, feed={}, fetch=[]): pass
return feed, fetch def preprocess(self, input_dicts):
def postprocess(self, feed={}, fetch=[], fetch_map=None): # multiple previous Op
return fetch_map if len(input_dicts) != 1:
_LOGGER.critical(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func.")
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict
def postprocess(self, input_dicts, fetch_dict):
return fetch_dict
``` ```
### init_op
This method is used to load user-defined resources such as dictionaries. A separator is loaded in the [UciOp](../python/examples/pipeline/simple_web_service/web_service.py).
**Note**: If Op is launched in threaded mode, different threads of the same Op execute `init_op` only once and share `init_op` loaded resources when Op is multi-concurrent.
### preprocess ### preprocess
The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`: This method is used to preprocess the data before model prediction. It has an `input_dicts` parameter, `input_dicts` is a dictionary, key is the `name` of the previous Op, and value is the data transferred from the corresponding previous op (the data is also in dictionary format).
- The value of `feed` is the feed part `request.json["feed"]` in the request data The `preprocess` method needs to process the data into a ndarray dictionary (key is the feed variable name, and value is the corresponding ndarray value). Op will take the return value as the input of the model prediction and pass the output to the `postprocess` method.
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
The return values are the feed and fetch values used in the prediction. **Note**: if Op does not have a model configuration file, the return value of `preprocess` will be directly passed to `postprocess`.
### postprocess ### postprocess
The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`: This method is used for data post-processing after model prediction. It has two parameters, `input_dicts` and `fetch_dict`.
Where the `input_dicts` parameter is consistent with the parameter in `preprocess` method, and `fetch_dict` is the output of the model prediction (key is the name of the fetch variable, and value is the corresponding ndarray value). Op will take the return value of `postprocess` as the input of subsequent Op `preprocess`.
- The value of `feed` is the feed part `request.json["feed"]` in the request data **Note**: if Op does not have a model configuration file, `fetch_dict` will be the return value of `preprocess`.
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
- The value of `fetch_map` is the model output value.
The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request.
## Develop ImageService class
Here is the op of the UCI example:
```python
class UciOp(Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dicts, fetch_dict):
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
```
## WebService base class
Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `get_pipeline_response` method to define the topological relationship between Ops. The default implementation is as follows:
```python ```python
class ImageService(WebService): class WebService(object):
def get_pipeline_response(self, read_op):
def preprocess(self, feed={}, fetch=[]): return None
reader = ImageReader() ```
feed_batch = []
for ins in feed: Where `read_op` serves as the entry point of the topology map of the whole service (that is, the first op defined by the user is followed by `read_op`).
if "image" not in ins:
raise ("feed data error!") For single Op service (single model), take Uci service as an example (there is only one Uci prediction model in the whole service):
sample = base64.b64decode(ins["image"])
img = reader.process_image(sample) ```python
feed_batch.append({"image": img}) class UciService(WebService):
return feed_batch, fetch def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
```
For multiple Op services (multiple models), take Ocr service as an example (the whole service is completed in series by Det model and Rec model):
```python
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
```
WebService objects need to load a yaml configuration file through the `prepare_pipeline_config` to configure each Op and the entire service. The simplest configuration file is as follows (Uci example):
```yaml
http_port: 18080
op:
uci:
local_service_conf:
model_config: uci_housing_model # path
```
All field names of yaml file are as follows:
```yaml
rpc_port: 18080 # gRPC port
build_dag_each_worker: false # Whether to use process server or not. The default is false
worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1
http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0.
dag:
is_thread_op: true # Whether to use the thread version of OP. The default is true
client_type: brpc # Use brpc or grpc client. The default is brpc
retry: 1 # The number of times DAG executor retries after failure. The default value is 1, that is, no retrying
use_profile: false # Whether to print the log on the server side. The default is false
tracer:
interval_s: -1 # Monitoring time interval of Tracer (in seconds). Do not start monitoring when the value is less than 1. The default value is -1
op:
<op_name>: # op name, corresponding to the one defined in the program
concurrency: 1 # op concurrency number, the default is 1
timeout: -1 # predict timeout in milliseconds. The default value is -1, that is, no timeout
retry: 1 # timeout retransmissions. The default value is 1, that is, do not try again
batch_size: 1 # If this field is set, Op will merge multiple request outputs into a single batch
auto_batching_timeout: -1 # auto-batching timeout in milliseconds. The default value is -1, that is, no timeout
local_service_conf:
model_config: # the path of the corresponding model file. There is no default value(None). If this item is not configured, the model file will not be loaded.
workdir: "" # working directory of corresponding model
thread_num: 2 # the corresponding model is started with thread_num threads
devices: "" # on which device does the model launched. You can specify the GPU card number(such as "0,1,2"), which is CPU by default
mem_optim: true # mem optimization option, the default is true
ir_optim: false # ir optimization option, the default is false
``` ```
For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction. All fields of Op can be defined when Op is created in the program (which will override yaml fields).
# 如何开发一个新的Web Service? # 如何开发一个新的Web Service?
(简体中文|[English](NEW_WEB_SERVICE.md)) (简体中文|[English](NEW_WEB_SERVICE.md))
本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/imagenet/resnet50_web_service.py)查阅完整的代码。 本文档将以 Uci 房价预测服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/pipeline/simple_web_service/web_service.py)查阅完整的代码。
## Op 基类
在一些服务中,单个模型可能无法满足需求,需要多个模型串联或并联来完成整个服务。我们将单个模型操作称为 Op,并提供了一套简单的接口来实现 Op 串联或并联的复杂逻辑。
## WebService基类 Op 间数据是以字典形式进行传递的,Op 可以以线程或进程方式启动,同时可以对 Op 的并发数等进行配置。
Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下: 通常情况下,您需要继承 Op 基类,重写它的 `init_op``preprocess``postprocess` 方法,默认实现如下:
```python ```python
class WebService(object): class Op(object):
def init_op(self):
def preprocess(self, feed={}, fetch=[]): pass
return feed, fetch def preprocess(self, input_dicts):
def postprocess(self, feed={}, fetch=[], fetch_map=None): # multiple previous Op
return fetch_map if len(input_dicts) != 1:
_LOGGER.critical(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func.")
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict
def postprocess(self, input_dicts, fetch_dict):
return fetch_dict
``` ```
### preprocess方法 ### init_op 方法
该方法用于加载用户自定义资源(如字典等),在 [UciOp](../python/examples/pipeline/simple_web_service/web_service.py) 中加载了一个分隔符。
**注意**:如果 Op 是以线程模式加载的,那么在 Op 多并发时,同种 Op 的不同线程只执行一次 `init_op`,且共用 `init_op` 加载的资源。
### preprocess 方法
该方法用于模型预测前对数据的预处理,它有一个 `input_dicts` 参数,`input_dicts` 是一个字典,key 为前继 Op 的 `name`,value 为对应前继 Op 传递过来的数据(数据同样是字典格式)。
`preprocess` 方法需要将数据处理成 ndarray 字典(key 为 feed 变量名,value 为对应的 ndarray 值),Op 会将该返回值作为模型预测的输入,并将输出传递给 `postprocess` 方法。
preprocess方法有两个输入参数,`feed``fetch`。对于一个HTTP请求`request` **注意**:如果 Op 没有配置模型,则 `preprocess` 的返回值会直接传递给 `postprocess`
- `feed`的值为请求数据中的feed部分`request.json["feed"]` ### postprocess 方法
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
返回值分别是预测过程中用到的feed和fetch值 该方法用于模型预测后对数据的后处理,它有两个参数,`input_dicts``fetch_dict`
### postprocess方法 其中,`input_dicts``preprocess` 的参数相同,`fetch_dict` 为模型预测的输出(key 为 fetch 变量名,value 为对应的 ndarray 值)。Op 会将 `postprocess` 的返回值作为后继 Op `preprocess` 的输入。
postprocess方法有三个输入参数,`feed``fetch``fetch_map` **注意**:如果 Op 没有配置模型,则 `fetch_dict` 将为 `preprocess` 的返回值。
- `feed`的值为请求数据中的feed部分`request.json["feed"]`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
- `fetch_map`的值为fetch到的模型输出值
返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。
## 开发ImageService类 下面是 Uci 例子的 Op:
```python ```python
class ImageService(WebService): class UciOp(Op):
def init_op(self):
def preprocess(self, feed={}, fetch=[]): self.separator = ","
reader = ImageReader()
feed_batch = [] def preprocess(self, input_dicts):
for ins in feed: (_, input_dict), = input_dicts.items()
if "image" not in ins: x_value = input_dict["x"]
raise ("feed data error!") if isinstance(x_value, (str, unicode)):
sample = base64.b64decode(ins["image"]) input_dict["x"] = np.array(
img = reader.process_image(sample) [float(x.strip()) for x in x_value.split(self.separator)])
feed_batch.append({"image": img}) return input_dict
return feed_batch, fetch
def postprocess(self, input_dicts, fetch_dict):
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
```
## WebService 基类
Paddle Serving 实现了 [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L28) 基类,您需要重写它的 `get_pipeline_response` 方法来定义 Op 间的拓扑关系,并返回作为 Response 的 Op,默认实现如下:
```python
class WebService(object):
def get_pipeline_response(self, read_op):
return None
```
其中,`read_op` 作为整个服务拓扑图的入口(即用户自定义的第一个 Op 的前继为 `read_op`)。
对于单 Op 服务(单模型),以 Uci 服务为例(整个服务中只有一个 Uci 房价预测模型):
```python
class UciService(WebService):
def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
```
对于多 Op 服务(多模型),以 Ocr 服务为例(整个服务由 Det 模型和 Rec 模型串联完成):
```python
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
```
WebService 对象需要通过 `prepare_pipeline_config` 加载一个 yaml 配置文件,用来对各个 Op 以及整个服务进行配置,最简单的配置文件如下(Uci 例子):
```yaml
http_port: 18080
op:
uci:
local_service_conf:
model_config: uci_housing_model # 路径
```
yaml 文件的所有字段名详见下面:
```yaml
rpc_port: 18080 # gRPC端口号
build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false
worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1
http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0
dag:
is_thread_op: true # 是否使用线程版Op,默认为 true
client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc
retry: 1 # DAG Executor 在失败后重试次数,默认为 1,即不重试
use_profile: false # 是否在 Server 端打印日志,默认为 false
tracer:
interval_s: -1 # Tracer 监控的时间间隔,单位为秒。当该值小于 1 时不启动监控,默认为 -1
op:
<op_name>: # op 名,与程序中定义的相对应
concurrency: 1 # op 并发数,默认为 1
timeout: -1 # 预测超时时间,单位为毫秒。默认为 -1 即不超时
retry: 1 # 超时重发次数。默认为 1 即不重试
batch_size: 1 # auto-batching 中的 batch_size,若设置该字段则 Op 会将多个请求输出合并为一个 batch
auto_batching_timeout: -1 # auto-batching 超时时间,单位为毫秒。默认为 -1 即不超时
local_service_conf:
model_config: # 对应模型文件的路径,无默认值(None)。若不配置该项则不会加载模型文件。
workdir: "" # 对应模型的工作目录
thread_num: 2 # 对应模型用几个线程启动
devices: "" # 模型启动在哪个设备上,可以指定 gpu 卡号(如 "0,1,2"),默认为 cpu
mem_optim: true # mem 优化选项,默认为 true
ir_optim: false # ir 优化选项,默认为 false
``` ```
对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式 其中,Op 的所有字段均可以在程序中创建 Op 时定义(会覆盖 yaml 的字段)
...@@ -251,9 +251,10 @@ server.run_server() ...@@ -251,9 +251,10 @@ server.run_server()
Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows:
```yaml ```yaml
port: 18080 # gRPC port rpc_port: 18080 # gRPC port
worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1
build_dag_each_worker: false # Whether to use process server or not. The default is false build_dag_each_worker: false # Whether to use process server or not. The default is false
http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0.
dag: dag:
is_thread_op: true # Whether to use the thread version of OP. The default is true is_thread_op: true # Whether to use the thread version of OP. The default is true
client_type: brpc # Use brpc or grpc client. The default is brpc client_type: brpc # Use brpc or grpc client. The default is brpc
...@@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. ...@@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
``` ```
PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`.
### Start PipelineServer ### Start PipelineServer
Run the following code Run the following code
...@@ -384,7 +387,7 @@ for f in futures: ...@@ -384,7 +387,7 @@ for f in futures:
## How to optimize through the timeline tool ## How to optimize with the timeline tool
In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service. In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service.
......
...@@ -249,9 +249,10 @@ server.run_server() ...@@ -249,9 +249,10 @@ server.run_server()
其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: 其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下:
```yaml ```yaml
port: 18080 # gRPC端口号 rpc_port: 18080 # gRPC端口号
worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1
build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false
http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0
dag: dag:
is_thread_op: true # 是否使用线程版Op,默认为 true is_thread_op: true # 是否使用线程版Op,默认为 true
client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc
...@@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. ...@@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
``` ```
PipelineServing 也支持本地自动启动 PaddleServingService,请参考 `python/examples/pipeline/ocr` 下的例子。
### 启动 PipelineServer ### 启动 PipelineServer
运行下面代码 运行下面代码
......
...@@ -38,12 +38,15 @@ If you have saved model files using Paddle's `save_inference_model` API, you can ...@@ -38,12 +38,15 @@ If you have saved model files using Paddle's `save_inference_model` API, you can
import paddle_serving_client.io as serving_io import paddle_serving_client.io as serving_io
serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None ) serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None )
``` ```
dirname (str) - Path of saved model files. Program file and parameter files are saved in this directory. Or you can use a build-in python module called `paddle_serving_client.convert` to convert it.
```python
serving_server (str, optional) - The path of model files and configuration files for server. Default: "serving_server". python -m paddle_serving_client.convert --dirname ./your_inference_model_dir
```
serving_client (str, optional) - The path of configuration files for client. Default: "serving_client". Arguments are the same as `inference_model_to_serving` API.
| Argument | Type | Default | Description |
model_filename (str, optional) - The name of file to load the inference program. If it is None, the default filename `__model__` will be used. Default: None. |--------------|------|-----------|--------------------------------|
| `dirname` | str | - | Path of saved model files. Program file and parameter files are saved in this directory. |
paras_filename (str, optional) - The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. Default: None. | `serving_server` | str | `"serving_server"` | The path of model files and configuration files for server. |
| `serving_client` | str | `"serving_client"` | The path of configuration files for client. |
| `model_filename` | str | None | The name of file to load the inference program. If it is None, the default filename `__model__` will be used. |
| `paras_filename` | str | None | The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. |
...@@ -39,12 +39,15 @@ for line in sys.stdin: ...@@ -39,12 +39,15 @@ for line in sys.stdin:
import paddle_serving_client.io as serving_io import paddle_serving_client.io as serving_io
serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None) serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None)
``` ```
dirname (str) – 需要转换的模型文件存储路径,Program结构文件和参数文件均保存在此目录。 或者你可以使用Paddle Serving提供的名为`paddle_serving_client.convert`的内置模块进行转换。
```python
serving_server (str, 可选) - 转换后的模型文件和配置文件的存储路径。默认值为serving_server。 python -m paddle_serving_client.convert --dirname ./your_inference_model_dir
```
serving_client (str, 可选) - 转换后的客户端配置文件存储路径。默认值为serving_client。 模块参数与`inference_model_to_serving`接口参数相同。
| 参数 | 类型 | 默认值 | 描述 |
model_filename (str,可选) – 存储需要转换的模型Inference Program结构的文件名称。如果设置为None,则使用 `__model__` 作为默认的文件名。默认值为None。 |--------------|------|-----------|--------------------------------|
| `dirname` | str | - | 需要转换的模型文件存储路径,Program结构文件和参数文件均保存在此目录。|
params_filename (str,可选) – 存储需要转换的模型所有参数的文件名称。当且仅当所有模型参数被保存在一个单独的二进制文件中,它才需要被指定。如果模型参数是存储在各自分离的文件中,设置它的值为None。默认值为None。 | `serving_server` | str | `"serving_server"` | 转换后的模型文件和配置文件的存储路径。默认值为serving_server |
| `serving_client` | str | `"serving_client"` | 转换后的客户端配置文件存储路径。默认值为serving_client |
| `model_filename` | str | None | 存储需要转换的模型Inference Program结构的文件名称。如果设置为None,则使用 `__model__` 作为默认的文件名 |
| `paras_filename` | str | None | 存储需要转换的模型所有参数的文件名称。当且仅当所有模型参数被保存在一个单独的二进制文件中,它才需要被指定。如果模型参数是存储在各自分离的文件中,设置它的值为None |
# How to develop a new Web service?
([简体中文](NEW_WEB_SERVICE_CN.md)|English)
This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](../python/examples/imagenet/resnet50_web_service.py).
## WebService base class
Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess
The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`:
- The value of `feed` is the feed part `request.json["feed"]` in the request data
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
The return values are the feed and fetch values used in the prediction.
### postprocess
The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`:
- The value of `feed` is the feed part `request.json["feed"]` in the request data
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
- The value of `fetch_map` is the model output value.
The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request.
## Develop ImageService class
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
feed_batch = []
for ins in feed:
if "image" not in ins:
raise ("feed data error!")
sample = base64.b64decode(ins["image"])
img = reader.process_image(sample)
feed_batch.append({"image": img})
return feed_batch, fetch
```
For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction.
# 如何开发一个新的Web Service?
(简体中文|[English](NEW_WEB_SERVICE.md))
本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/imagenet/resnet50_web_service.py)查阅完整的代码。
## WebService基类
Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess方法
preprocess方法有两个输入参数,`feed``fetch`。对于一个HTTP请求`request`
- `feed`的值为请求数据中的feed部分`request.json["feed"]`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
返回值分别是预测过程中用到的feed和fetch值。
### postprocess方法
postprocess方法有三个输入参数,`feed``fetch``fetch_map`
- `feed`的值为请求数据中的feed部分`request.json["feed"]`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
- `fetch_map`的值为fetch到的模型输出值
返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。
## 开发ImageService类
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
feed_batch = []
for ins in feed:
if "image" not in ins:
raise ("feed data error!")
sample = base64.b64decode(ins["image"])
img = reader.process_image(sample)
feed_batch.append({"image": img})
return feed_batch, fetch
```
对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。
if (CLIENT) if (CLIENT)
file(INSTALL pipeline DESTINATION paddle_serving_client) file(INSTALL pipeline DESTINATION paddle_serving_client)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_client/pipeline/proto)
file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py) file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py)
set(PY_FILES ${SERVING_CLIENT_PY_FILES}) set(PY_FILES ${SERVING_CLIENT_PY_FILES})
SET(PACKAGE_NAME "serving_client") SET(PACKAGE_NAME "serving_client")
...@@ -11,13 +9,9 @@ endif() ...@@ -11,13 +9,9 @@ endif()
if (SERVER) if (SERVER)
if (NOT WITH_GPU) if (NOT WITH_GPU)
file(INSTALL pipeline DESTINATION paddle_serving_server) file(INSTALL pipeline DESTINATION paddle_serving_server)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server/pipeline/proto)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py)
else() else()
file(INSTALL pipeline DESTINATION paddle_serving_server_gpu) file(INSTALL pipeline DESTINATION paddle_serving_server_gpu)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server_gpu/pipeline/proto)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py)
endif() endif()
set(PY_FILES ${SERVING_SERVER_PY_FILES}) set(PY_FILES ${SERVING_SERVER_PY_FILES})
...@@ -25,6 +19,8 @@ if (SERVER) ...@@ -25,6 +19,8 @@ if (SERVER)
set(SETUP_LOG_FILE "setup.py.server.log") set(SETUP_LOG_FILE "setup.py.server.log")
endif() endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py
${CMAKE_CURRENT_BINARY_DIR}/util.py)
if (CLIENT) if (CLIENT)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py) ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
...@@ -47,6 +43,9 @@ if (SERVER) ...@@ -47,6 +43,9 @@ if (SERVER)
endif() endif()
endif() endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/gen_version.py
${CMAKE_CURRENT_BINARY_DIR}/gen_version.py)
set (SERVING_CLIENT_CORE ${PADDLE_SERVING_BINARY_DIR}/core/general-client/*.so) set (SERVING_CLIENT_CORE ${PADDLE_SERVING_BINARY_DIR}/core/general-client/*.so)
message("python env: " ${py_env}) message("python env: " ${py_env})
...@@ -54,6 +53,7 @@ if (APP) ...@@ -54,6 +53,7 @@ if (APP)
add_custom_command( add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_app/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_app/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "app"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_APP_CORE} general_model_config_py_proto ${PY_FILES}) DEPENDS ${SERVING_APP_CORE} general_model_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
...@@ -65,6 +65,7 @@ add_custom_command( ...@@ -65,6 +65,7 @@ add_custom_command(
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_client/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_client/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND ${CMAKE_COMMAND} -E copy ${SERVING_CLIENT_CORE} ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/serving_client.so COMMAND ${CMAKE_COMMAND} -E copy ${SERVING_CLIENT_CORE} ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/serving_client.so
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} python_tag.py COMMAND env ${py_env} ${PYTHON_EXECUTABLE} python_tag.py
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "client"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_CLIENT_CORE} sdk_configure_py_proto ${PY_FILES}) DEPENDS ${SERVING_CLIENT_CORE} sdk_configure_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS serving_client ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS serving_client ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
...@@ -75,6 +76,7 @@ if (SERVER) ...@@ -75,6 +76,7 @@ if (SERVER)
add_custom_command( add_custom_command(
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "server"
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES}) DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
...@@ -83,7 +85,8 @@ if (SERVER) ...@@ -83,7 +85,8 @@ if (SERVER)
OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp
COMMAND cp -r COMMAND cp -r
${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/ ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} paddle_serving_server_gpu/gen_cuda_version.py ${CUDA_VERSION_MAJOR} COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py
"server_gpu" ${CUDA_VERSION_MAJOR}
COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel
DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES}) DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES})
add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp)
......
# IMDB model ensemble 样例
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python -m paddle_serving_server_gpu.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server_gpu.serve --model imdb_bow_model --port 9393 &> bow.log &
python test_pipeline_server.py &>pipeline.log &
```
## 启动客户端
```
python test_pipeline_client.py
```
## HTTP 测试
```
curl -X POST -k http://localhost:9999/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}'
```
port: 18080 rpc_port: 18085
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
http_port: 9999
dag: dag:
is_thread_op: false is_thread_op: false
client_type: brpc client_type: brpc
......
# OCR Pipeline WebService
(English|[简体中文](./README_CN.md))
This document will take OCR as an example to show how to use Pipeline WebService to start multi-model tandem services.
## Get Model
```
python -m paddle_serving_app.package --get_model ocr_rec
tar -xzvf ocr_rec.tar.gz
python -m paddle_serving_app.package --get_model ocr_det
tar -xzvf ocr_det.tar.gz
```
## Get Dataset (Optional)
```
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar
tar xf test_imgs.tar
```
## Start Service
```
python web_service.py &>log.txt &
```
## Test
```
python pipeline_http_client.py
```
<!--
## More (PipelineServing)
You can choose one of the following versions to start Service.
### Remote Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### Local Service Version
```
python local_service_pipeline_server.py &>pipeline.log &
```
### Hybrid Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## Client Prediction
### RPC
```
python pipeline_rpc_client.py
```
### HTTP
```
python pipeline_http_client.py
```
-->
# OCR Pipeline WebService
([English](./README.md)|简体中文)
本文档将以 OCR 为例,介绍如何使用 Pipeline WebService 启动多模型串联的服务。
## 获取模型
```
python -m paddle_serving_app.package --get_model ocr_rec
tar -xzvf ocr_rec.tar.gz
python -m paddle_serving_app.package --get_model ocr_det
tar -xzvf ocr_det.tar.gz
```
## 获取数据集(可选)
```
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar
tar xf test_imgs.tar
```
## 启动 WebService
```
python web_service.py &>log.txt &
```
## 测试
```
python pipeline_http_client.py
```
<!--
## 其他 (PipelineServing)
你可以选择下面任意一种版本启动服务。
### 远程服务版本
```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### 本地服务版本
```
python local_service_pipeline_server.py &>pipeline.log &
```
### 混合服务版本
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## 启动客户端
### RPC
```
python pipeline_rpc_client.py
```
### HTTP
```
python pipeline_http_client.py
```
-->
rpc_port: 18080
worker_num: 4
build_dag_each_worker: false
http_port: 9999
dag:
is_thread_op: false
client_type: brpc
retry: 1
use_profile: false
op:
det:
concurrency: 2
local_service_conf:
model_config: ocr_det_model
devices: "0"
rec:
concurrency: 1
timeout: -1
retry: 1
local_service_conf:
model_config: ocr_rec_model
devices: "0"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
devices="0", # gpu0. defalut: "" (cpu)
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
devices="0", # gpu0. defalut: "" (cpu)
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_rec_model"),
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
url = "http://127.0.0.1:9999/ocr/prediction"
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
for i in range(4):
data = {"key": ["image"], "value": [image]}
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(4):
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
server_endpoints=["127.0.0.1:12000"],
fetch_list=["concat_1.tmp_0"],
client_config="ocr_det_client/serving_client_conf.prototxt",
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import cv2
import base64
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
uci_service = OcrService(name="ocr")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
# Simple Pipeline WebService
This document will takes UCI service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_data.sh
```
## Start server
```
python web_service.py &>log.txt &
```
## Http test
```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
# Simple Pipeline WebService
这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python web_service.py &>log.txt &
```
## 测试
```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
worker_num: 4
http_port: 18080
dag:
is_thread_op: false
op:
uci:
local_service_conf:
model_config: uci_housing_model
devices: "" # "0,1"
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
_LOGGER = logging.getLogger()
class UciOp(Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
_LOGGER.info(input_dict)
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dicts, fetch_dict):
# _LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
class UciService(WebService):
def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
uci_service = UciService(name="uci")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
...@@ -30,7 +30,6 @@ client.load_client_config("yolov4_client/serving_client_conf.prototxt") ...@@ -30,7 +30,6 @@ client.load_client_config("yolov4_client/serving_client_conf.prototxt")
client.connect(['127.0.0.1:9393']) client.connect(['127.0.0.1:9393'])
im = preprocess(sys.argv[1]) im = preprocess(sys.argv[1])
print(im.shape)
fetch_map = client.predict( fetch_map = client.predict(
feed={ feed={
"image": im, "image": im,
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import re
import os
import subprocess
def update_info(file_name, feature, info):
new_str = ""
with open(file_name, "r") as f:
for line in f.readlines():
if re.match(feature, line):
if isinstance(info, str):
line = feature + " = \"" + info.strip() + "\"\n"
else:
line = feature + " = \"" + info.decode('utf-8').strip(
) + "\"\n"
new_str = new_str + line
with open(file_name, "w") as f:
f.write(new_str)
if len(sys.argv) > 2:
update_info("paddle_serving_server_gpu/version.py", "cuda_version",
sys.argv[2])
path = "paddle_serving_" + sys.argv[1]
commit_id = subprocess.check_output(['git', 'rev-parse', 'HEAD'])
update_info(path + "/version.py", "commit_id", commit_id)
...@@ -13,3 +13,4 @@ ...@@ -13,3 +13,4 @@
# limitations under the License. # limitations under the License.
""" Paddle Serving App version string """ """ Paddle Serving App version string """
serving_app_version = "0.1.2" serving_app_version = "0.1.2"
commit_id = ""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage:
Convert a paddle inference model into a model file that can be used for Paddle Serving.
Example:
python -m paddle_serving_client.convert --dirname ./inference_model
"""
import argparse
from .io import inference_model_to_serving
def parse_args(): # pylint: disable=doc-string-missing
parser = argparse.ArgumentParser("convert")
parser.add_argument(
"--dirname",
type=str,
required=True,
help='Path of saved model files. Program file and parameter files are saved in this directory.'
)
parser.add_argument(
"--serving_server",
type=str,
default="serving_server",
help='The path of model files and configuration files for server. Default: "serving_server".'
)
parser.add_argument(
"--serving_client",
type=str,
default="serving_client",
help='The path of configuration files for client. Default: "serving_client".'
)
parser.add_argument(
"--model_filename",
type=str,
default=None,
help='The name of file to load the inference program. If it is None, the default filename __model__ will be used'
)
parser.add_argument(
"--params_filename",
type=str,
default=None,
help='The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. Default: None.'
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
inference_model_to_serving(
args.dirname,
serving_server=args.serving_server,
serving_client=args.serving_client,
model_filename=args.model_filename,
params_filename=args.params_filename)
...@@ -15,3 +15,4 @@ ...@@ -15,3 +15,4 @@
serving_client_version = "0.3.2" serving_client_version = "0.3.2"
serving_server_version = "0.3.2" serving_server_version = "0.3.2"
module_proto_version = "0.3.2" module_proto_version = "0.3.2"
commit_id = ""
...@@ -161,6 +161,10 @@ class Server(object): ...@@ -161,6 +161,10 @@ class Server(object):
self.container_id = None self.container_id = None
self.model_config_paths = None # for multi-model in a workflow self.model_config_paths = None # for multi-model in a workflow
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
......
...@@ -15,3 +15,4 @@ ...@@ -15,3 +15,4 @@
serving_client_version = "0.3.2" serving_client_version = "0.3.2"
serving_server_version = "0.3.2" serving_server_version = "0.3.2"
module_proto_version = "0.3.2" module_proto_version = "0.3.2"
commit_id = ""
...@@ -21,12 +21,36 @@ from paddle_serving_client import Client ...@@ -21,12 +21,36 @@ from paddle_serving_client import Client
from contextlib import closing from contextlib import closing
import socket import socket
from paddle_serving_server import pipeline
from paddle_serving_server.pipeline import Op
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
# pipeline
self._server = pipeline.PipelineServer(self.name)
def get_pipeline_response(self, read_op):
return None
def prepare_pipeline_config(self, yaml_file):
# build dag
read_op = pipeline.RequestOp()
last_op = self.get_pipeline_response(read_op)
if not isinstance(last_op, Op):
raise ValueError("The return value type of `get_pipeline_response` "
"function is not Op type, please check function "
"`get_pipeline_response`.")
response_op = pipeline.ResponseOp(input_ops=[last_op])
self._server.set_response_op(response_op)
self._server.prepare_server(yaml_file)
def run_service(self):
self._server.run_server()
def load_model_config(self, model_config): def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config self.model_config = model_config
def _launch_rpc_service(self): def _launch_rpc_service(self):
...@@ -63,6 +87,7 @@ class WebService(object): ...@@ -63,6 +87,7 @@ class WebService(object):
device="cpu", device="cpu",
mem_optim=True, mem_optim=True,
ir_optim=False): ir_optim=False):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
...@@ -104,6 +129,7 @@ class WebService(object): ...@@ -104,6 +129,7 @@ class WebService(object):
return result return result
def run_rpc_service(self): def run_rpc_service(self):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -153,6 +179,7 @@ class WebService(object): ...@@ -153,6 +179,7 @@ class WebService(object):
"{}".format(self.model_config), gpu=False, profile=False) "{}".format(self.model_config), gpu=False, profile=False)
def run_web_service(self): def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0", self.app_instance.run(host="0.0.0.0",
port=self.port, port=self.port,
threaded=False, threaded=False,
...@@ -162,9 +189,11 @@ class WebService(object): ...@@ -162,9 +189,11 @@ class WebService(object):
return self.app_instance return self.app_instance
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
print("This API will be deprecated later. Please do not use it")
return feed, fetch return feed, fetch
def postprocess(self, feed=[], fetch=[], fetch_map=None): def postprocess(self, feed=[], fetch=[], fetch_map=None):
print("This API will be deprecated later. Please do not use it")
for key in fetch_map: for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist() fetch_map[key] = fetch_map[key].tolist()
return fetch_map return fetch_map
...@@ -209,6 +209,10 @@ class Server(object): ...@@ -209,6 +209,10 @@ class Server(object):
self.product_name = None self.product_name = None
self.container_id = None self.container_id = None
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
......
...@@ -16,3 +16,4 @@ serving_client_version = "0.3.2" ...@@ -16,3 +16,4 @@ serving_client_version = "0.3.2"
serving_server_version = "0.3.2" serving_server_version = "0.3.2"
module_proto_version = "0.3.2" module_proto_version = "0.3.2"
cuda_version = "9" cuda_version = "9"
commit_id = ""
...@@ -24,17 +24,43 @@ import sys ...@@ -24,17 +24,43 @@ import sys
import numpy as np import numpy as np
import paddle_serving_server_gpu as serving import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline import Op
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
self.gpus = [] # pipeline
self.rpc_service_list = [] self._server = pipeline.PipelineServer(self.name)
self.gpus = [] # deprecated
self.rpc_service_list = [] # deprecated
def get_pipeline_response(self, read_op):
return None
def prepare_pipeline_config(self, yaml_file):
# build dag
read_op = pipeline.RequestOp()
last_op = self.get_pipeline_response(read_op)
if not isinstance(last_op, Op):
raise ValueError("The return value type of `get_pipeline_response` "
"function is not Op type, please check function "
"`get_pipeline_response`.")
response_op = pipeline.ResponseOp(input_ops=[last_op])
self._server.set_response_op(response_op)
self._server.prepare_server(yaml_file)
def run_service(self):
self._server.run_server()
def load_model_config(self, model_config): def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config self.model_config = model_config
def set_gpus(self, gpus): def set_gpus(self, gpus):
print("This API will be deprecated later. Please do not use it")
self.gpus = [int(x) for x in gpus.split(",")] self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self, def default_rpc_service(self,
...@@ -88,6 +114,7 @@ class WebService(object): ...@@ -88,6 +114,7 @@ class WebService(object):
gpuid=0, gpuid=0,
mem_optim=True, mem_optim=True,
ir_optim=False): ir_optim=False):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
...@@ -155,6 +182,7 @@ class WebService(object): ...@@ -155,6 +182,7 @@ class WebService(object):
return result return result
def run_rpc_service(self): def run_rpc_service(self):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -183,6 +211,7 @@ class WebService(object): ...@@ -183,6 +211,7 @@ class WebService(object):
# TODO: maybe change another API name: maybe run_local_predictor? # TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False): def run_debugger_service(self, gpu=False):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -209,18 +238,21 @@ class WebService(object): ...@@ -209,18 +238,21 @@ class WebService(object):
"{}".format(self.model_config), gpu=gpu, profile=False) "{}".format(self.model_config), gpu=gpu, profile=False)
def run_web_service(self): def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0", self.app_instance.run(host="0.0.0.0",
port=self.port, port=self.port,
threaded=False, threaded=False,
processes=1) processes=4)
def get_app_instance(self): def get_app_instance(self):
return self.app_instance return self.app_instance
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
print("This API will be deprecated later. Please do not use it")
return feed, fetch return feed, fetch
def postprocess(self, feed=[], fetch=[], fetch_map=None): def postprocess(self, feed=[], fetch=[], fetch_map=None):
for key in fetch_map.iterkeys(): print("This API will be deprecated later. Please do not use it")
for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist() fetch_map[key] = fetch_map[key].tolist()
return fetch_map return fetch_map
...@@ -11,8 +11,9 @@ ...@@ -11,8 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import from . import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp from .operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from .pipeline_server import PipelineServer
from pipeline_client import PipelineClient from .pipeline_client import PipelineClient
from analyse import Analyst from .local_rpc_service_handler import LocalRpcServiceHandler
from .analyse import Analyst
...@@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum): ...@@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum):
RPC_PACKAGE_ERROR = 4 RPC_PACKAGE_ERROR = 4
CLIENT_ERROR = 5 CLIENT_ERROR = 5
CLOSED_ERROR = 6 CLOSED_ERROR = 6
UNKNOW = 7 NO_SERVICE = 7
UNKNOW = 8
class ChannelDataType(enum.Enum): class ChannelDataType(enum.Enum):
......
...@@ -299,13 +299,12 @@ class DAGExecutor(object): ...@@ -299,13 +299,12 @@ class DAGExecutor(object):
sys.stderr.write(profile_str) sys.stderr.write(profile_str)
# add profile info into rpc_resp # add profile info into rpc_resp
profile_value = ""
if resp_channeldata.client_need_profile: if resp_channeldata.client_need_profile:
profile_set = resp_channeldata.profile_data_set profile_set = resp_channeldata.profile_data_set
profile_set.add(profile_str) profile_set.add(profile_str)
profile_value = "".join(list(profile_set)) profile_value = "".join(list(profile_set))
rpc_resp.key.append(self._client_profile_key) rpc_resp.key.append(self._client_profile_key)
rpc_resp.value.append(profile_value) rpc_resp.value.append(profile_value)
return rpc_resp return rpc_resp
...@@ -338,7 +337,8 @@ class DAG(object): ...@@ -338,7 +337,8 @@ class DAG(object):
self._manager = PipelineProcSyncManager() self._manager = PipelineProcSyncManager()
_LOGGER.info("[DAG] Succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): @staticmethod
def get_use_ops(response_op):
unique_names = set() unique_names = set()
used_ops = set() used_ops = set()
succ_ops_of_use_op = {} # {op_name: succ_ops} succ_ops_of_use_op = {} # {op_name: succ_ops}
...@@ -427,11 +427,11 @@ class DAG(object): ...@@ -427,11 +427,11 @@ class DAG(object):
_LOGGER.critical("Failed to build DAG: ResponseOp" _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.") " has not been set.")
os._exit(-1) os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op) used_ops, out_degree_ops = DAG.get_use_ops(response_op)
if not self._build_dag_each_worker: if not self._build_dag_each_worker:
_LOGGER.info("================= USED OP =================") _LOGGER.info("================= USED OP =================")
for op in used_ops: for op in used_ops:
if op.name != self._request_name: if not isinstance(op, RequestOp):
_LOGGER.info(op.name) _LOGGER.info(op.name)
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1: if len(used_ops) <= 1:
......
...@@ -11,17 +11,3 @@ ...@@ -11,17 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import sys
import re
import os
new_str = ""
with open("paddle_serving_server_gpu/version.py", "r") as f:
for line in f.readlines():
if re.match("cuda_version", line):
line = re.sub(r"\d+", sys.argv[1], line)
new_str = new_str + line
with open("paddle_serving_server_gpu/version.py", "w") as f:
f.write(new_str)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package baidu.paddle_serving.pipeline_serving;
option go_package = ".;pipeline_serving";
import "google/api/annotations.proto";
message Response {
repeated string key = 1;
repeated string value = 2;
int32 ecode = 3;
string error_info = 4;
};
message Request {
repeated string key = 1;
repeated string value = 2;
string name = 3;
}
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/prediction"
body : "*"
};
}
};
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"C"
"flag"
"net/http"
"log"
"strconv"
"golang.org/x/net/context"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
gw "./proto"
)
//export run_proxy_server
func run_proxy_server(grpc_port int, http_port int) error {
var (
pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:" + strconv.Itoa(grpc_port), "endpoint of PipelineService")
)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterPipelineServiceHandlerFromEndpoint(ctx, mux, *pipelineEndpoint, opts)
if err != nil {
return err
}
log.Println("start proxy service")
return http.ListenAndServe(":" + strconv.Itoa(http_port), mux) // proxy port
}
func main() {}
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import multiprocessing
try:
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
PACKAGE_VERSION = "GPU"
except ImportError:
from paddle_serving_server import OpMaker, OpSeqMaker, Server
PACKAGE_VERSION = "CPU"
from . import util
_LOGGER = logging.getLogger(__name__)
_workdir_name_gen = util.NameGenerator("workdir_")
class LocalRpcServiceHandler(object):
def __init__(self,
model_config,
workdir="",
thread_num=2,
devices="",
mem_optim=True,
ir_optim=False,
available_port_generator=None):
if available_port_generator is None:
available_port_generator = util.GetAvailablePortGenerator()
self._model_config = model_config
self._port_list = []
if devices == "":
# cpu
devices = [-1]
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in cpu device. Port({})"
.format(model_config, self._port_list))
else:
# gpu
if PACKAGE_VERSION == "CPU":
raise ValueError(
"You are using the CPU version package("
"paddle-serving-server), unable to set devices")
devices = [int(x) for x in devices.split(",")]
for _ in devices:
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})"
.format(model_config, devices, self._port_list))
self._workdir = workdir
self._devices = devices
self._thread_num = thread_num
self._mem_optim = mem_optim
self._ir_optim = ir_optim
self._rpc_service_list = []
self._server_pros = []
self._fetch_vars = None
def get_fetch_list(self):
return self._fetch_vars
def get_port_list(self):
return self._port_list
def get_client_config(self):
return os.path.join(self._model_config, "serving_server_conf.prototxt")
def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
ir_optim):
device = "gpu"
if gpuid == -1:
device = "cpu"
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.load_model_config(self._model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
if self._fetch_vars is None:
self._fetch_vars = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
self._rpc_service_list[service_idx].run_server()
def prepare_server(self):
for i, device_id in enumerate(self._devices):
if self._workdir != "":
workdir = "{}_{}".format(self._workdir, i)
else:
workdir = _workdir_name_gen.next()
self._rpc_service_list.append(
self._prepare_one_server(
workdir,
self._port_list[i],
device_id,
thread_num=self._thread_num,
mem_optim=self._mem_optim,
ir_optim=self._ir_optim))
def start_server(self):
for i, service in enumerate(self._rpc_service_list):
p = multiprocessing.Process(
target=self._start_one_server, args=(i, ))
p.daemon = True
self._server_pros.append(p)
for p in self._server_pros:
p.start()
...@@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
ChannelTimeoutError) ChannelTimeoutError)
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
from . import local_rpc_service_handler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -47,46 +48,128 @@ class Op(object): ...@@ -47,46 +48,128 @@ class Op(object):
def __init__(self, def __init__(self,
name=None, name=None,
input_ops=[], input_ops=[],
server_endpoints=[], server_endpoints=None,
fetch_list=[], fetch_list=None,
client_config=None, client_config=None,
concurrency=1, concurrency=None,
timeout=-1, timeout=None,
retry=1, retry=None,
batch_size=1, batch_size=None,
auto_batching_timeout=None): auto_batching_timeout=None,
local_rpc_service_handler=None):
# In __init__, all the parameters are just saved and Op is not initialized
if name is None: if name is None:
name = _op_name_gen.next() name = _op_name_gen.next()
self.name = name # to identify the type of OP, it must be globally unique self.name = name # to identify the type of OP, it must be globally unique
self.concurrency = concurrency # amount of concurrency self.concurrency = concurrency # amount of concurrency
self.set_input_ops(input_ops) self.set_input_ops(input_ops)
self._local_rpc_service_handler = local_rpc_service_handler
self._server_endpoints = server_endpoints self._server_endpoints = server_endpoints
self.with_serving = False
if len(self._server_endpoints) != 0:
self.with_serving = True
self._client_config = client_config
self._fetch_names = fetch_list self._fetch_names = fetch_list
self._client_config = client_config
if timeout > 0: self._timeout = timeout
self._timeout = timeout / 1000.0
else:
self._timeout = -1
self._retry = max(1, retry) self._retry = max(1, retry)
self._batch_size = batch_size
self._auto_batching_timeout = auto_batching_timeout
self._input = None self._input = None
self._outputs = [] self._outputs = []
self._batch_size = batch_size self._server_use_profile = False
self._auto_batching_timeout = auto_batching_timeout self._tracer = None
if self._auto_batching_timeout is not None:
if self._auto_batching_timeout <= 0 or self._batch_size == 1: # only for thread op
_LOGGER.warning( self._for_init_op_lock = threading.Lock()
self._log( self._for_close_op_lock = threading.Lock()
"Because auto_batching_timeout <= 0 or batch_size == 1," self._succ_init_op = False
" set auto_batching_timeout to None.")) self._succ_close_op = False
self._auto_batching_timeout = None
def init_from_dict(self, conf):
# init op
if self.concurrency is None:
self.concurrency = conf["concurrency"]
if self._retry is None:
self._retry = conf["retry"]
if self._fetch_names is None:
self._fetch_names = conf.get("fetch_list")
if self._client_config is None:
self._client_config = conf.get("client_config")
if self._timeout is None:
self._timeout = conf["timeout"]
if self._timeout > 0:
self._timeout = self._timeout / 1000.0
else:
self._timeout = -1
if self._batch_size is None:
self._batch_size = conf["batch_size"]
if self._auto_batching_timeout is None:
self._auto_batching_timeout = conf["auto_batching_timeout"]
if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning(
self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None."))
self._auto_batching_timeout = None
else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
if self._server_endpoints is None:
server_endpoints = conf.get("server_endpoints", [])
if len(server_endpoints) != 0:
# remote service
self.with_serving = True
self._server_endpoints = server_endpoints
else: else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 if self._local_rpc_service_handler is None:
local_service_conf = conf.get("local_service_conf")
_LOGGER.info("local_service_conf: {}".format(
local_service_conf))
model_config = local_service_conf.get("model_config")
_LOGGER.info("model_config: {}".format(model_config))
if model_config is None:
self.with_serving = False
else:
# local rpc service
self.with_serving = True
service_handler = local_rpc_service_handler.LocalRpcServiceHandler(
model_config=model_config,
workdir=local_service_conf["workdir"],
thread_num=local_service_conf["thread_num"],
devices=local_service_conf["devices"],
mem_optim=local_service_conf["mem_optim"],
ir_optim=local_service_conf["ir_optim"])
service_handler.prepare_server() # get fetch_list
serivce_ports = service_handler.get_port_list()
self._server_endpoints = [
"127.0.0.1:{}".format(p) for p in serivce_ports
]
if self._client_config is None:
self._client_config = service_handler.get_client_config(
)
if self._fetch_names is None:
self._fetch_names = service_handler.get_fetch_list()
self._local_rpc_service_handler = service_handler
else:
self.with_serving = True
self._local_rpc_service_handler.prepare_server(
) # get fetch_list
serivce_ports = self._local_rpc_service_handler.get_port_list(
)
self._server_endpoints = [
"127.0.0.1:{}".format(p) for p in serivce_ports
]
if self._client_config is None:
self._client_config = self._local_rpc_service_handler.get_client_config(
)
if self._fetch_names is None:
self._fetch_names = self._local_rpc_service_handler.get_fetch_list(
)
else:
self.with_serving = True
if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp): if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp):
_LOGGER.info( _LOGGER.info(
self._log("\n\tinput_ops: {}," self._log("\n\tinput_ops: {},"
...@@ -98,20 +181,22 @@ class Op(object): ...@@ -98,20 +181,22 @@ class Op(object):
"\n\tretry: {}," "\n\tretry: {},"
"\n\tbatch_size: {}," "\n\tbatch_size: {},"
"\n\tauto_batching_timeout(s): {}".format( "\n\tauto_batching_timeout(s): {}".format(
", ".join([op.name for op in input_ops ", ".join([op.name for op in self._input_ops
]), self._server_endpoints, ]), self._server_endpoints,
self._fetch_names, self._client_config, self._fetch_names, self._client_config,
self.concurrency, self._timeout, self._retry, self.concurrency, self._timeout, self._retry,
self._batch_size, self._auto_batching_timeout))) self._batch_size, self._auto_batching_timeout)))
self._server_use_profile = False def launch_local_rpc_service(self):
self._tracer = None if self._local_rpc_service_handler is None:
_LOGGER.warning(
# only for thread op self._log("Failed to launch local rpc"
self._for_init_op_lock = threading.Lock() " service: local_rpc_service_handler is None."))
self._for_close_op_lock = threading.Lock() return
self._succ_init_op = False port = self._local_rpc_service_handler.get_port_list()
self._succ_close_op = False self._local_rpc_service_handler.start_server()
_LOGGER.info("Op({}) use local rpc service at port: {}"
.format(self.name, port))
def use_default_auto_batching_config(self): def use_default_auto_batching_config(self):
if self._batch_size != 1: if self._batch_size != 1:
...@@ -775,7 +860,9 @@ class RequestOp(Op): ...@@ -775,7 +860,9 @@ class RequestOp(Op):
for idx, key in enumerate(request.key): for idx, key in enumerate(request.key):
data = request.value[idx] data = request.value[idx]
try: try:
data = eval(data) evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e: except Exception as e:
pass pass
dictdata[key] = data dictdata[key] = data
......
...@@ -42,11 +42,12 @@ class PipelineClient(object): ...@@ -42,11 +42,12 @@ class PipelineClient(object):
def _pack_request_package(self, feed_dict, profile): def _pack_request_package(self, feed_dict, profile):
req = pipeline_service_pb2.Request() req = pipeline_service_pb2.Request()
np.set_printoptions(threshold=sys.maxsize)
for key, value in feed_dict.items(): for key, value in feed_dict.items():
req.key.append(key) req.key.append(key)
if isinstance(value, np.ndarray): if isinstance(value, np.ndarray):
req.value.append(value.__repr__()) req.value.append(value.__repr__())
elif isinstance(value, str): elif isinstance(value, (str, unicode)):
req.value.append(value) req.value.append(value)
elif isinstance(value, list): elif isinstance(value, list):
req.value.append(np.array(value).__repr__()) req.value.append(np.array(value).__repr__())
...@@ -75,7 +76,9 @@ class PipelineClient(object): ...@@ -75,7 +76,9 @@ class PipelineClient(object):
continue continue
data = resp.value[idx] data = resp.value[idx]
try: try:
data = eval(data) evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e: except Exception as e:
pass pass
fetch_map[key] = data fetch_map[key] = data
......
...@@ -22,22 +22,31 @@ from contextlib import closing ...@@ -22,22 +22,31 @@ from contextlib import closing
import multiprocessing import multiprocessing
import yaml import yaml
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from .operator import ResponseOp from . import operator
from .dag import DAGExecutor from . import dag
from . import util
from . import channel
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_conf, worker_idx=-1): def __init__(self, name, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
self._name = name
# init dag executor # init dag executor
self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context): def inference(self, request, context):
if request.name != "" and request.name != self._name:
resp = pipeline_service_pb2.Response()
resp.ecode = channel.ChannelDataEcode.NO_SERVICE.value
resp.error_info = "Failed to inference: Service name error."
return resp
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
return resp return resp
...@@ -57,35 +66,83 @@ def _reserve_port(port): ...@@ -57,35 +66,83 @@ def _reserve_port(port):
class PipelineServer(object): class PipelineServer(object):
def __init__(self): def __init__(self, name=None):
self._port = None self._name = name # for grpc-gateway path
self._rpc_port = None
self._worker_num = None self._worker_num = None
self._response_op = None self._response_op = None
self._proxy_server = None
def _grpc_gateway(self, grpc_port, http_port):
import os
from ctypes import cdll
from . import gateway
lib_path = os.path.join(
os.path.dirname(gateway.__file__), "libproxy_server.so")
proxy_server = cdll.LoadLibrary(lib_path)
proxy_server.run_proxy_server(grpc_port, http_port)
def _run_grpc_gateway(self, grpc_port, http_port):
if http_port <= 0:
_LOGGER.info("Ignore grpc_gateway configuration.")
return
if not util.AvailablePortGenerator.port_is_available(http_port):
raise SystemExit("Failed to run grpc-gateway: prot {} "
"is already used".format(http_port))
if self._proxy_server is not None:
raise RuntimeError("Proxy server has been started.")
self._proxy_server = multiprocessing.Process(
target=self._grpc_gateway, args=(
grpc_port,
http_port, ))
self._proxy_server.daemon = True
self._proxy_server.start()
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, operator.ResponseOp):
raise Exception("Failed to set response_op: response_op " raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.") "must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("Failed to set response_op: response_op " raise Exception("Failed to set response_op: response_op "
"can only have one previous op.") "can only have one previous op.")
self._response_op = response_op self._response_op = response_op
self._used_op, _ = dag.DAG.get_use_ops(self._response_op)
def prepare_server(self, yml_file=None, yml_dict=None):
conf = ServerYamlConfChecker.load_server_yaml_conf(
yml_file=yml_file, yml_dict=yml_dict)
self._rpc_port = conf.get("rpc_port")
self._http_port = conf.get("http_port")
if self._rpc_port is None:
if self._http_port is None:
raise SystemExit("Failed to prepare_server: rpc_port or "
"http_port can not be None.")
else:
# http mode: generate rpc_port
if not util.AvailablePortGenerator.port_is_available(
self._http_port):
raise SystemExit("Failed to prepare_server: http_port({}) "
"is already used".format(self._http_port))
self._rpc_port = util.GetAvailablePortGenerator().next()
else:
if not util.AvailablePortGenerator.port_is_available(
self._rpc_port):
raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._rpc_port))
if self._http_port is None:
# rpc mode
pass
else:
# http mode
if not util.AvailablePortGenerator.port_is_available(
self._http_port):
raise SystemExit("Failed to prepare_server: http_port({}) "
"is already used".format(self._http_port))
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def prepare_server(self, yml_file):
conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file)
self._port = conf["port"]
if not self._port_is_available(self._port):
raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._port))
self._worker_num = conf["worker_num"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"] self._build_dag_each_worker = conf["build_dag_each_worker"]
self._init_ops(conf["op"])
_LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("============= PIPELINE SERVER =============")
_LOGGER.info("\n{}".format( _LOGGER.info("\n{}".format(
...@@ -98,10 +155,40 @@ class PipelineServer(object): ...@@ -98,10 +155,40 @@ class PipelineServer(object):
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
self._conf = conf self._conf = conf
self._start_local_rpc_service()
def _init_ops(self, op_conf):
default_conf = {
"concurrency": 1,
"timeout": -1,
"retry": 1,
"batch_size": 1,
"auto_batching_timeout": -1,
"local_service_conf": {
"workdir": "",
"thread_num": 2,
"devices": "",
"mem_optim": True,
"ir_optim": False,
},
}
for op in self._used_op:
if not isinstance(op, operator.RequestOp) and not isinstance(
op, operator.ResponseOp):
conf = op_conf.get(op.name, default_conf)
op.init_from_dict(conf)
def _start_local_rpc_service(self):
# only brpc now
if self._conf["dag"]["client_type"] != "brpc":
_LOGGER.warning("Local service version must be brpc type now.")
for op in self._used_op:
if not isinstance(op, operator.RequestOp):
op.launch_local_rpc_service()
def run_server(self): def run_server(self):
if self._build_dag_each_worker: if self._build_dag_each_worker:
with _reserve_port(self._port) as port: with _reserve_port(self._rpc_port) as port:
bind_address = 'localhost:{}'.format(port) bind_address = 'localhost:{}'.format(port)
workers = [] workers = []
for i in range(self._worker_num): for i in range(self._worker_num):
...@@ -111,6 +198,9 @@ class PipelineServer(object): ...@@ -111,6 +198,9 @@ class PipelineServer(object):
args=(bind_address, self._response_op, self._conf, i)) args=(bind_address, self._response_op, self._conf, i))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
self._run_grpc_gateway(
grpc_port=self._rpc_port,
http_port=self._http_port) # start grpc_gateway
for worker in workers: for worker in workers:
worker.join() worker.join()
else: else:
...@@ -120,9 +210,13 @@ class PipelineServer(object): ...@@ -120,9 +210,13 @@ class PipelineServer(object):
('grpc.max_receive_message_length', 256 * 1024 * 1024) ('grpc.max_receive_message_length', 256 * 1024 * 1024)
]) ])
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._conf), server) PipelineServicer(self._name, self._response_op, self._conf),
server.add_insecure_port('[::]:{}'.format(self._port)) server)
server.add_insecure_port('[::]:{}'.format(self._rpc_port))
server.start() server.start()
self._run_grpc_gateway(
grpc_port=self._rpc_port,
http_port=self._http_port) # start grpc_gateway
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
...@@ -133,7 +227,8 @@ class PipelineServer(object): ...@@ -133,7 +227,8 @@ class PipelineServer(object):
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_conf, worker_idx), server) PipelineServicer(self._name, response_op, dag_conf, worker_idx),
server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
...@@ -144,12 +239,25 @@ class ServerYamlConfChecker(object): ...@@ -144,12 +239,25 @@ class ServerYamlConfChecker(object):
pass pass
@staticmethod @staticmethod
def load_server_yaml_conf(yml_file): def load_server_yaml_conf(yml_file=None, yml_dict=None):
with open(yml_file) as f: if yml_file is not None and yml_dict is not None:
conf = yaml.load(f.read()) raise SystemExit("Failed to prepare_server: only one of yml_file"
" or yml_dict can be selected as the parameter.")
if yml_file is not None:
with open(yml_file) as f:
conf = yaml.load(f.read())
elif yml_dict is not None:
conf = yml_dict
else:
raise SystemExit("Failed to prepare_server: yml_file or yml_dict"
" can not be None.")
ServerYamlConfChecker.check_server_conf(conf) ServerYamlConfChecker.check_server_conf(conf)
ServerYamlConfChecker.check_dag_conf(conf["dag"]) ServerYamlConfChecker.check_dag_conf(conf["dag"])
ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"]) ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"])
for op_name in conf["op"]:
ServerYamlConfChecker.check_op_conf(conf["op"][op_name])
ServerYamlConfChecker.check_local_service_conf(conf["op"][op_name][
"local_service_conf"])
return conf return conf
@staticmethod @staticmethod
...@@ -161,26 +269,80 @@ class ServerYamlConfChecker(object): ...@@ -161,26 +269,80 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_server_conf(conf): def check_server_conf(conf):
default_conf = { default_conf = {
"port": 9292, # "rpc_port": 9292,
"worker_num": 1, "worker_num": 1,
"build_dag_each_worker": False, "build_dag_each_worker": False,
#"http_port": 0,
"dag": {}, "dag": {},
"op": {},
} }
conf_type = { conf_type = {
"port": int, "rpc_port": int,
"http_port": int,
"worker_num": int, "worker_num": int,
"build_dag_each_worker": bool, "build_dag_each_worker": bool,
"grpc_gateway_port": int,
} }
conf_qualification = { conf_qualification = {
"port": [(">=", 1024), ("<=", 65535)], "rpc_port": [(">=", 1024), ("<=", 65535)],
"http_port": [(">=", 1024), ("<=", 65535)],
"worker_num": (">=", 1), "worker_num": (">=", 1),
} }
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification) conf_qualification)
@staticmethod
def check_local_service_conf(conf):
default_conf = {
"workdir": "",
"thread_num": 2,
"devices": "",
"mem_optim": True,
"ir_optim": False,
}
conf_type = {
"model_config": str,
"workdir": str,
"thread_num": int,
"devices": str,
"mem_optim": bool,
"ir_optim": bool,
}
conf_qualification = {"thread_num": (">=", 1), }
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_op_conf(conf):
default_conf = {
"concurrency": 1,
"timeout": -1,
"retry": 1,
"batch_size": 1,
"auto_batching_timeout": -1,
"local_service_conf": {},
}
conf_type = {
"server_endpoints": list,
"fetch_list": list,
"client_config": str,
"concurrency": int,
"timeout": int,
"retry": int,
"batch_size": int,
"auto_batching_timeout": int,
}
conf_qualification = {
"concurrency": (">=", 1),
"retry": (">=", 1),
"batch_size": (">=", 1),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod @staticmethod
def check_tracer_conf(conf): def check_tracer_conf(conf):
default_conf = {"interval_s": -1, } default_conf = {"interval_s": -1, }
...@@ -231,6 +393,8 @@ class ServerYamlConfChecker(object): ...@@ -231,6 +393,8 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_conf_type(conf, conf_type): def check_conf_type(conf, conf_type):
for key, val in conf_type.items(): for key, val in conf_type.items():
if key not in conf:
continue
if not isinstance(conf[key], val): if not isinstance(conf[key], val):
raise SystemExit("[CONF] {} must be {} type, but get {}." raise SystemExit("[CONF] {} must be {} type, but get {}."
.format(key, val, type(conf[key]))) .format(key, val, type(conf[key])))
...@@ -238,6 +402,8 @@ class ServerYamlConfChecker(object): ...@@ -238,6 +402,8 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_conf_qualification(conf, conf_qualification): def check_conf_qualification(conf, conf_qualification):
for key, qualification in conf_qualification.items(): for key, qualification in conf_qualification.items():
if key not in conf:
continue
if not isinstance(qualification, list): if not isinstance(qualification, list):
qualification = [qualification] qualification = [qualification]
if not ServerYamlConfChecker.qualification_check(conf[key], if not ServerYamlConfChecker.qualification_check(conf[key],
......
...@@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving; ...@@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving;
message Request { message Request {
repeated string key = 1; repeated string key = 1;
repeated string value = 2; repeated string value = 2;
optional string name = 3;
}; };
message Response { message Response {
......
...@@ -17,6 +17,8 @@ import logging ...@@ -17,6 +17,8 @@ import logging
import threading import threading
import multiprocessing import multiprocessing
import multiprocessing.managers import multiprocessing.managers
from contextlib import closing
import socket
if sys.version_info.major == 2: if sys.version_info.major == 2:
import Queue import Queue
from Queue import PriorityQueue from Queue import PriorityQueue
...@@ -29,6 +31,34 @@ else: ...@@ -29,6 +31,34 @@ else:
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class AvailablePortGenerator(object):
def __init__(self, start_port=12000):
self._curr_port = start_port
@staticmethod
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def next(self):
while not AvailablePortGenerator.port_is_available(self._curr_port):
self._curr_port += 1
self._curr_port += 1
return self._curr_port - 1
_AvailablePortGenerator = AvailablePortGenerator()
def GetAvailablePortGenerator():
return _AvailablePortGenerator
class NameGenerator(object): class NameGenerator(object):
# use unsafe-id-generator # use unsafe-id-generator
def __init__(self, prefix): def __init__(self, prefix):
......
...@@ -16,7 +16,6 @@ from __future__ import absolute_import ...@@ -16,7 +16,6 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
import os import os
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
...@@ -24,18 +23,9 @@ from setuptools import find_packages ...@@ -24,18 +23,9 @@ from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_app.version import serving_app_version from paddle_serving_app.version import serving_app_version
from pkg_resources import DistributionNotFound, get_distribution from pkg_resources import DistributionNotFound, get_distribution
import util
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
max_version, mid_version, min_version = python_version()
if '${PACK}' == 'ON': if '${PACK}' == 'ON':
copy_lib() copy_lib()
......
...@@ -16,7 +16,6 @@ from __future__ import absolute_import ...@@ -16,7 +16,6 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
import os import os
import sys import sys
...@@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension ...@@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_client.version import serving_client_version from paddle_serving_client.version import serving_client_version
from pkg_resources import DistributionNotFound, get_distribution import util
py_version = sys.version_info py_version = sys.version_info
def python_version():
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def copy_lib(): def copy_lib():
if py_version[0] == 2: if py_version[0] == 2:
lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10'] lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10']
...@@ -51,18 +40,20 @@ def copy_lib(): ...@@ -51,18 +40,20 @@ def copy_lib():
text = r.read() text = r.read()
os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1])) os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1]))
max_version, mid_version, min_version = python_version() max_version, mid_version, min_version = util.python_version()
# gen pipeline proto code
util.gen_pipeline_code("paddle_serving_client")
if '${PACK}' == 'ON': if '${PACK}' == 'ON':
copy_lib() copy_lib()
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1',
'grpcio-tools >= 1.28.1' 'grpcio-tools >= 1.28.1'
] ]
if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"): if not util.find_package("paddlepaddle") and not util.find_package("paddlepaddle-gpu"):
REQUIRED_PACKAGES.append("paddlepaddle") REQUIRED_PACKAGES.append("paddlepaddle")
...@@ -72,8 +63,10 @@ packages=['paddle_serving_client', ...@@ -72,8 +63,10 @@ packages=['paddle_serving_client',
'paddle_serving_client.metric', 'paddle_serving_client.metric',
'paddle_serving_client.utils', 'paddle_serving_client.utils',
'paddle_serving_client.pipeline', 'paddle_serving_client.pipeline',
'paddle_serving_client.pipeline.proto'] 'paddle_serving_client.pipeline.proto',
package_data={'paddle_serving_client': ['serving_client.so','lib/*'],} 'paddle_serving_client.pipeline.gateway',
'paddle_serving_client.pipeline.gateway.proto']
package_data={'paddle_serving_client': ['serving_client.so', 'lib/*', 'pipeline/gateway/libproxy_server.so'],}
package_dir={'paddle_serving_client': package_dir={'paddle_serving_client':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client',
'paddle_serving_client.proto': 'paddle_serving_client.proto':
...@@ -87,7 +80,11 @@ package_dir={'paddle_serving_client': ...@@ -87,7 +80,11 @@ package_dir={'paddle_serving_client':
'paddle_serving_client.pipeline': 'paddle_serving_client.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline',
'paddle_serving_client.pipeline.proto': 'paddle_serving_client.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto',
'paddle_serving_client.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway',
'paddle_serving_client.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway/proto'}
setup( setup(
name='paddle-serving-client', name='paddle-serving-client',
......
...@@ -16,17 +16,14 @@ from __future__ import absolute_import ...@@ -16,17 +16,14 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving.version import serving_client_version from paddle_serving.version import serving_client_version
from grpc_tools import protoc
import util
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version()
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle' 'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle'
......
...@@ -16,25 +16,16 @@ from __future__ import absolute_import ...@@ -16,25 +16,16 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_server.version import serving_server_version from paddle_serving_server.version import serving_server_version
from pkg_resources import DistributionNotFound, get_distribution import util
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version() # gen pipeline proto code
util.gen_pipeline_code("paddle_serving_server")
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ ...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_server', packages=['paddle_serving_server',
'paddle_serving_server.proto', 'paddle_serving_server.proto',
'paddle_serving_server.pipeline', 'paddle_serving_server.pipeline',
'paddle_serving_server.pipeline.proto'] 'paddle_serving_server.pipeline.proto',
'paddle_serving_server.pipeline.gateway',
'paddle_serving_server.pipeline.gateway.proto']
package_dir={'paddle_serving_server': package_dir={'paddle_serving_server':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server',
...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server': ...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server':
'paddle_serving_server.pipeline': 'paddle_serving_server.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline',
'paddle_serving_server.pipeline.proto': 'paddle_serving_server.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto',
'paddle_serving_server.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway',
'paddle_serving_server.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway/proto'}
package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so'],}
setup( setup(
name='paddle-serving-server', name='paddle-serving-server',
...@@ -65,6 +64,7 @@ setup( ...@@ -65,6 +64,7 @@ setup(
author_email='guru4elephant@gmail.com', author_email='guru4elephant@gmail.com',
install_requires=REQUIRED_PACKAGES, install_requires=REQUIRED_PACKAGES,
packages=packages, packages=packages,
package_data=package_data,
package_dir=package_dir, package_dir=package_dir,
# PyPI package information. # PyPI package information.
classifiers=[ classifiers=[
......
...@@ -16,25 +16,16 @@ from __future__ import absolute_import ...@@ -16,25 +16,16 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_server_gpu.version import serving_server_version from paddle_serving_server_gpu.version import serving_server_version
from pkg_resources import DistributionNotFound, get_distribution import util
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version() # gen pipeline proto code
util.gen_pipeline_code("paddle_serving_server_gpu")
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ ...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_server_gpu', packages=['paddle_serving_server_gpu',
'paddle_serving_server_gpu.proto', 'paddle_serving_server_gpu.proto',
'paddle_serving_server_gpu.pipeline', 'paddle_serving_server_gpu.pipeline',
'paddle_serving_server_gpu.pipeline.proto'] 'paddle_serving_server_gpu.pipeline.proto',
'paddle_serving_server_gpu.pipeline.gateway',
'paddle_serving_server_gpu.pipeline.gateway.proto']
package_dir={'paddle_serving_server_gpu': package_dir={'paddle_serving_server_gpu':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu',
...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu': ...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu':
'paddle_serving_server_gpu.pipeline': 'paddle_serving_server_gpu.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline',
'paddle_serving_server_gpu.pipeline.proto': 'paddle_serving_server_gpu.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto',
'paddle_serving_server_gpu.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway',
'paddle_serving_server_gpu.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway/proto'}
package_data={'paddle_serving_server_gpu': ['pipeline/gateway/libproxy_server.so'],}
setup( setup(
name='paddle-serving-server-gpu', name='paddle-serving-server-gpu',
...@@ -65,6 +64,7 @@ setup( ...@@ -65,6 +64,7 @@ setup(
author_email='guru4elephant@gmail.com', author_email='guru4elephant@gmail.com',
install_requires=REQUIRED_PACKAGES, install_requires=REQUIRED_PACKAGES,
packages=packages, packages=packages,
package_data=package_data,
package_dir=package_dir, package_dir=package_dir,
# PyPI package information. # PyPI package information.
classifiers=[ classifiers=[
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pkg_resources import DistributionNotFound, get_distribution
from grpc_tools import protoc
import os
import platform
def python_version():
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def gen_pipeline_code(package_name):
# pipeline service proto
protoc.main((
'',
'-I.',
'--python_out=.',
'--grpc_python_out=.',
'{}/pipeline/proto/pipeline_service.proto'.format(package_name), ))
# pipeline grpc-gateway proto
# *.pb.go
ret = os.system(
"cd {}/pipeline/gateway/proto/ && "
"../../../../../third_party/install/protobuf/bin/protoc -I. "
"-I$GOPATH/src "
"-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis "
"--go_out=plugins=grpc:. "
"gateway.proto".format(package_name))
if ret != 0:
exit(1)
# *.gw.go
ret = os.system(
"cd {}/pipeline/gateway/proto/ && "
"../../../../../third_party/install/protobuf/bin/protoc -I. "
"-I$GOPATH/src "
"-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis "
"--grpc-gateway_out=logtostderr=true:. "
"gateway.proto".format(package_name))
if ret != 0:
exit(1)
# pipeline grpc-gateway shared-lib
ret = os.system(
"cd {}/pipeline/gateway && "
"go build -buildmode=c-shared -o libproxy_server.so proxy_server.go".
format(package_name))
if ret != 0:
exit(1)
...@@ -19,6 +19,13 @@ function init() { ...@@ -19,6 +19,13 @@ function init() {
cd Serving cd Serving
export SERVING_WORKDIR=$PWD export SERVING_WORKDIR=$PWD
$PYTHONROOT/bin/python -m pip install -r python/requirements.txt $PYTHONROOT/bin/python -m pip install -r python/requirements.txt
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
} }
function check_cmd() { function check_cmd() {
...@@ -298,7 +305,6 @@ function python_test_bert() { ...@@ -298,7 +305,6 @@ function python_test_bert() {
cd bert # pwd: /Serving/python/examples/bert cd bert # pwd: /Serving/python/examples/bert
case $TYPE in case $TYPE in
CPU) CPU)
pip install paddlehub
# Because download from paddlehub may timeout, # Because download from paddlehub may timeout,
# download the model from bos(max_seq_len=128). # download the model from bos(max_seq_len=128).
wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz
...@@ -306,14 +312,12 @@ function python_test_bert() { ...@@ -306,14 +312,12 @@ function python_test_bert() {
sh get_data.sh sh get_data.sh
check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &" check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &"
sleep 5 sleep 5
pip install paddle_serving_app
check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt"
kill_server_process kill_server_process
echo "bert RPC inference pass" echo "bert RPC inference pass"
;; ;;
GPU) GPU)
export CUDA_VISIBLE_DEVICES=0 export CUDA_VISIBLE_DEVICES=0
pip install paddlehub
# Because download from paddlehub may timeout, # Because download from paddlehub may timeout,
# download the model from bos(max_seq_len=128). # download the model from bos(max_seq_len=128).
wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz
...@@ -321,7 +325,6 @@ function python_test_bert() { ...@@ -321,7 +325,6 @@ function python_test_bert() {
sh get_data.sh sh get_data.sh
check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &" check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &"
sleep 5 sleep 5
pip install paddle_serving_app
check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt"
kill_server_process kill_server_process
echo "bert RPC inference pass" echo "bert RPC inference pass"
...@@ -760,13 +763,14 @@ function python_test_resnet50(){ ...@@ -760,13 +763,14 @@ function python_test_resnet50(){
} }
function python_test_pipeline(){ function python_test_pipeline(){
# pwd:/ Serving/python/examples # pwd: /Serving/python/examples
local TYPE=$1 local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
unsetproxy unsetproxy
cd pipeline/imdb_model_ensemble cd pipeline # pwd: /Serving/python/examples/pipeline
case $TYPE in case $TYPE in
CPU) CPU)
cd imdb_model_ensemble # pwd: /Serving/python/examples/pipeline/imdb_model_ensemble
# start paddle serving service (brpc) # start paddle serving service (brpc)
sh get_data.sh sh get_data.sh
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log & python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log &
...@@ -775,7 +779,7 @@ function python_test_pipeline(){ ...@@ -775,7 +779,7 @@ function python_test_pipeline(){
# test: thread servicer & thread op # test: thread servicer & thread op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -792,7 +796,7 @@ EOF ...@@ -792,7 +796,7 @@ EOF
# test: thread servicer & process op # test: thread servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -809,7 +813,7 @@ EOF ...@@ -809,7 +813,7 @@ EOF
# test: process servicer & process op # test: process servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -828,7 +832,7 @@ EOF ...@@ -828,7 +832,7 @@ EOF
pip uninstall grpcio -y pip uninstall grpcio -y
pip install grpcio --no-binary=grpcio pip install grpcio --no-binary=grpcio
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: true build_dag_each_worker: true
dag: dag:
...@@ -852,7 +856,7 @@ EOF ...@@ -852,7 +856,7 @@ EOF
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log &
sleep 5 sleep 5
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -869,16 +873,47 @@ EOF ...@@ -869,16 +873,47 @@ EOF
kill_server_process kill_server_process
kill_process_by_port 9292 kill_process_by_port 9292
kill_process_by_port 9393 kill_process_by_port 9393
cd ..
cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service.py >/dev/null &
sleep 5
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
check http code
http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction`
if [ ${http_code} -ne 200 ]; then
echo "HTTP status code -ne 200"
exit 1
fi
ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill
ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill
kill_server_process
cd ..
;; ;;
GPU) GPU)
echo "pipeline ignore GPU test" cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service.py >/dev/null &
sleep 5
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
# check http code
http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction`
if [ ${http_code} -ne 200 ]; then
echo "HTTP status code -ne 200"
exit 1
fi
ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill
ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill
kill_server_process
cd .. # pwd: /Serving/python/examples/pipeline
;; ;;
*) *)
echo "error type" echo "error type"
exit 1 exit 1
;; ;;
esac esac
cd ../../ cd ..
setproxy setproxy
unset SERVING_BIN unset SERVING_BIN
} }
...@@ -928,118 +963,8 @@ function monitor_test() { ...@@ -928,118 +963,8 @@ function monitor_test() {
mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test
case $TYPE in case $TYPE in
CPU): CPU):
pip install pyftpdlib # The CPU part and GPU part are identical.
mkdir remote_path # In order to avoid Travis CI timeout (50 min), the CPU version is not checked
mkdir local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &"
cd .. # pwd: /Serving/_monitor_test
# type: ftp
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: ftp
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill
;; ;;
GPU): GPU):
pip install pyftpdlib pip install pyftpdlib
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册