提交 52f1d9f2 编写于 作者: M MRXLT 提交者: GitHub

Merge pull request #801 from barrierye/update-pipeline-doc

add PipelineWebService
...@@ -61,7 +61,22 @@ pip install -r python/requirements.txt ...@@ -61,7 +61,22 @@ pip install -r python/requirements.txt
If Python3 is used, replace `pip` with `pip3`. If Python3 is used, replace `pip` with `pip3`.
## GOPATH Setting
The default GOPATH is `$HOME/go`, which you can set to other values.
```shell
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
```
## Get go packages
```shell
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
```
## Compile Server ## Compile Server
......
...@@ -61,6 +61,22 @@ pip install -r python/requirements.txt ...@@ -61,6 +61,22 @@ pip install -r python/requirements.txt
如果使用 Python3,请以 `pip3` 替换 `pip` 如果使用 Python3,请以 `pip3` 替换 `pip`
## GOPATH 设置
默认 GOPATH 设置为 `$HOME/go`,您也可以设置为其他值。
```shell
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
```
## 获取 Go packages
```shell
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
```
## 编译Server部分 ## 编译Server部分
......
...@@ -251,9 +251,10 @@ server.run_server() ...@@ -251,9 +251,10 @@ server.run_server()
Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows:
```yaml ```yaml
port: 18080 # gRPC port rpc_port: 18080 # gRPC port
worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1
build_dag_each_worker: false # Whether to use process server or not. The default is false build_dag_each_worker: false # Whether to use process server or not. The default is false
http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0.
dag: dag:
is_thread_op: true # Whether to use the thread version of OP. The default is true is_thread_op: true # Whether to use the thread version of OP. The default is true
client_type: brpc # Use brpc or grpc client. The default is brpc client_type: brpc # Use brpc or grpc client. The default is brpc
...@@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. ...@@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
``` ```
PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`.
### Start PipelineServer ### Start PipelineServer
Run the following code Run the following code
...@@ -384,7 +387,7 @@ for f in futures: ...@@ -384,7 +387,7 @@ for f in futures:
## How to optimize through the timeline tool ## How to optimize with the timeline tool
In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service. In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service.
......
...@@ -249,9 +249,10 @@ server.run_server() ...@@ -249,9 +249,10 @@ server.run_server()
其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: 其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下:
```yaml ```yaml
port: 18080 # gRPC端口号 rpc_port: 18080 # gRPC端口号
worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1
build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false
http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0
dag: dag:
is_thread_op: true # 是否使用线程版Op,默认为 true is_thread_op: true # 是否使用线程版Op,默认为 true
client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc
...@@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. ...@@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
``` ```
PipelineServing 也支持本地自动启动 PaddleServingService,请参考 `python/examples/pipeline/ocr` 下的例子。
### 启动 PipelineServer ### 启动 PipelineServer
运行下面代码 运行下面代码
......
if (CLIENT) if (CLIENT)
file(INSTALL pipeline DESTINATION paddle_serving_client) file(INSTALL pipeline DESTINATION paddle_serving_client)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_client/pipeline/proto)
file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py) file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py)
set(PY_FILES ${SERVING_CLIENT_PY_FILES}) set(PY_FILES ${SERVING_CLIENT_PY_FILES})
SET(PACKAGE_NAME "serving_client") SET(PACKAGE_NAME "serving_client")
...@@ -11,13 +9,9 @@ endif() ...@@ -11,13 +9,9 @@ endif()
if (SERVER) if (SERVER)
if (NOT WITH_GPU) if (NOT WITH_GPU)
file(INSTALL pipeline DESTINATION paddle_serving_server) file(INSTALL pipeline DESTINATION paddle_serving_server)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server/pipeline/proto)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py)
else() else()
file(INSTALL pipeline DESTINATION paddle_serving_server_gpu) file(INSTALL pipeline DESTINATION paddle_serving_server_gpu)
execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server_gpu/pipeline/proto)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py)
endif() endif()
set(PY_FILES ${SERVING_SERVER_PY_FILES}) set(PY_FILES ${SERVING_SERVER_PY_FILES})
...@@ -25,6 +19,8 @@ if (SERVER) ...@@ -25,6 +19,8 @@ if (SERVER)
set(SETUP_LOG_FILE "setup.py.server.log") set(SETUP_LOG_FILE "setup.py.server.log")
endif() endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py
${CMAKE_CURRENT_BINARY_DIR}/util.py)
if (CLIENT) if (CLIENT)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in
${CMAKE_CURRENT_BINARY_DIR}/setup.py) ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
......
# IMDB model ensemble 样例
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python -m paddle_serving_server_gpu.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server_gpu.serve --model imdb_bow_model --port 9393 &> bow.log &
python test_pipeline_server.py &>pipeline.log &
```
## 启动客户端
```
python test_pipeline_client.py
```
## HTTP 测试
```
curl -X POST -k http://localhost:9999/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}'
```
port: 18080 rpc_port: 18085
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
http_port: 9999
dag: dag:
is_thread_op: false is_thread_op: false
client_type: brpc client_type: brpc
......
# OCR Pipeline WebService
(English|[简体中文](./README_CN.md))
This document will take OCR as an example to show how to use Pipeline WebService to start multi-model tandem services.
## Get Model
```
python -m paddle_serving_app.package --get_model ocr_rec
tar -xzvf ocr_rec.tar.gz
python -m paddle_serving_app.package --get_model ocr_det
tar -xzvf ocr_det.tar.gz
```
## Get Dataset (Optional)
```
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar
tar xf test_imgs.tar
```
## Start Service
```
python web_service.py &>log.txt &
```
## Test
```
python pipeline_http_client.py
```
<!--
## More (PipelineServing)
You can choose one of the following versions to start Service.
### Remote Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### Local Service Version
```
python local_service_pipeline_server.py &>pipeline.log &
```
### Hybrid Service Version
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## Client Prediction
### RPC
```
python pipeline_rpc_client.py
```
### HTTP
```
python pipeline_http_client.py
```
-->
# OCR Pipeline WebService
([English](./README.md)|简体中文)
本文档将以 OCR 为例,介绍如何使用 Pipeline WebService 启动多模型串联的服务。
## 获取模型
```
python -m paddle_serving_app.package --get_model ocr_rec
tar -xzvf ocr_rec.tar.gz
python -m paddle_serving_app.package --get_model ocr_det
tar -xzvf ocr_det.tar.gz
```
## 获取数据集(可选)
```
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar
tar xf test_imgs.tar
```
## 启动 WebService
```
python web_service.py &>log.txt &
```
## 测试
```
python pipeline_http_client.py
```
<!--
## 其他 (PipelineServing)
你可以选择下面任意一种版本启动服务。
### 远程服务版本
```
python -m paddle_serving_server.serve --model ocr_det_model --port 12000 --gpu_id 0 &> det.log &
python -m paddle_serving_server.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python remote_service_pipeline_server.py &>pipeline.log &
```
### 本地服务版本
```
python local_service_pipeline_server.py &>pipeline.log &
```
### 混合服务版本
```
python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 12001 --gpu_id 0 &> rec.log &
python hybrid_service_pipeline_server.py &>pipeline.log &
```
## 启动客户端
### RPC
```
python pipeline_rpc_client.py
```
### HTTP
```
python pipeline_http_client.py
```
-->
rpc_port: 18080
worker_num: 4
build_dag_each_worker: false
http_port: 9999
dag:
is_thread_op: false
client_type: brpc
retry: 1
use_profile: false
op:
det:
concurrency: 2
local_service_conf:
model_config: ocr_det_model
devices: "0"
rec:
concurrency: 1
timeout: -1
retry: 1
local_service_conf:
model_config: ocr_rec_model
devices: "0"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
devices="0", # gpu0. defalut: "" (cpu)
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_det_model",
workdir="det_workdir", # defalut: "workdir"
thread_num=2, # defalut: 2
devices="0", # gpu0. defalut: "" (cpu)
mem_optim=True, # defalut: True
ir_optim=False, # defalut: False
available_port_generator=None), # defalut: None
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
local_rpc_service_handler=LocalRpcServiceHandler(
model_config="ocr_rec_model"),
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
url = "http://127.0.0.1:9999/ocr/prediction"
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
for i in range(4):
data = {"key": ["image"], "value": [image]}
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(4):
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server_gpu.pipeline import PipelineServer
from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2
from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode
import numpy as np
import cv2
import time
import base64
import json
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
import time
import re
import base64
import logging
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
read_op = RequestOp()
det_op = DetOp(
name="det",
input_ops=[read_op],
server_endpoints=["127.0.0.1:12000"],
fetch_list=["concat_1.tmp_0"],
client_config="ocr_det_client/serving_client_conf.prototxt",
concurrency=1)
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
response_op = ResponseOp(input_ops=[rec_op])
server = PipelineServer("ocr")
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import cv2
import base64
from paddle_serving_app.reader import OCRReader
from paddle_serving_app.reader import Sequential, ResizeByFactor
from paddle_serving_app.reader import Div, Normalize, Transpose
from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes
_LOGGER = logging.getLogger()
class DetOp(Op):
def init_op(self):
self.det_preprocess = Sequential([
ResizeByFactor(32, 960), Div(255),
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1))
])
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
]
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
return out_dict
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
uci_service = OcrService(name="ocr")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
# Simple Pipeline WebService
This document will takes UCI service as an example to introduce how to use Pipeline WebService.
## Get model
```
sh get_data.sh
```
## Start server
```
python web_service.py &>log.txt &
```
## Http test
```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
# Simple Pipeline WebService
这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
sh get_data.sh
```
## 启动服务
```
python web_service.py &>log.txt &
```
## 测试
```
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
```
worker_num: 4
http_port: 18080
dag:
is_thread_op: false
op:
uci:
local_service_conf:
model_config: uci_housing_model
devices: "" # "0,1"
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
_LOGGER = logging.getLogger()
class UciOp(Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
_LOGGER.info(input_dict)
x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)])
return input_dict
def postprocess(self, input_dicts, fetch_dict):
# _LOGGER.info(fetch_dict)
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict
class UciService(WebService):
def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
uci_service = UciService(name="uci")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
...@@ -161,6 +161,10 @@ class Server(object): ...@@ -161,6 +161,10 @@ class Server(object):
self.container_id = None self.container_id = None
self.model_config_paths = None # for multi-model in a workflow self.model_config_paths = None # for multi-model in a workflow
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
......
...@@ -21,12 +21,36 @@ from paddle_serving_client import Client ...@@ -21,12 +21,36 @@ from paddle_serving_client import Client
from contextlib import closing from contextlib import closing
import socket import socket
from paddle_serving_server import pipeline
from paddle_serving_server.pipeline import Op
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
# pipeline
self._server = pipeline.PipelineServer(self.name)
def get_pipeline_response(self, read_op):
return None
def prepare_pipeline_config(self, yaml_file):
# build dag
read_op = pipeline.RequestOp()
last_op = self.get_pipeline_response(read_op)
if not isinstance(last_op, Op):
raise ValueError("The return value type of `get_pipeline_response` "
"function is not Op type, please check function "
"`get_pipeline_response`.")
response_op = pipeline.ResponseOp(input_ops=[last_op])
self._server.set_response_op(response_op)
self._server.prepare_server(yaml_file)
def run_service(self):
self._server.run_server()
def load_model_config(self, model_config): def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config self.model_config = model_config
def _launch_rpc_service(self): def _launch_rpc_service(self):
...@@ -63,6 +87,7 @@ class WebService(object): ...@@ -63,6 +87,7 @@ class WebService(object):
device="cpu", device="cpu",
mem_optim=True, mem_optim=True,
ir_optim=False): ir_optim=False):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
...@@ -102,6 +127,7 @@ class WebService(object): ...@@ -102,6 +127,7 @@ class WebService(object):
return result return result
def run_rpc_service(self): def run_rpc_service(self):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -151,6 +177,7 @@ class WebService(object): ...@@ -151,6 +177,7 @@ class WebService(object):
"{}".format(self.model_config), gpu=False, profile=False) "{}".format(self.model_config), gpu=False, profile=False)
def run_web_service(self): def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0", self.app_instance.run(host="0.0.0.0",
port=self.port, port=self.port,
threaded=False, threaded=False,
...@@ -160,9 +187,11 @@ class WebService(object): ...@@ -160,9 +187,11 @@ class WebService(object):
return self.app_instance return self.app_instance
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
print("This API will be deprecated later. Please do not use it")
return feed, fetch return feed, fetch
def postprocess(self, feed=[], fetch=[], fetch_map=None): def postprocess(self, feed=[], fetch=[], fetch_map=None):
print("This API will be deprecated later. Please do not use it")
for key in fetch_map: for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist() fetch_map[key] = fetch_map[key].tolist()
return fetch_map return fetch_map
...@@ -209,6 +209,10 @@ class Server(object): ...@@ -209,6 +209,10 @@ class Server(object):
self.product_name = None self.product_name = None
self.container_id = None self.container_id = None
def get_fetch_list(self):
fetch_names = [var.alias_name for var in self.model_conf.fetch_var]
return fetch_names
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
......
...@@ -24,17 +24,43 @@ import sys ...@@ -24,17 +24,43 @@ import sys
import numpy as np import numpy as np
import paddle_serving_server_gpu as serving import paddle_serving_server_gpu as serving
from paddle_serving_server_gpu import pipeline
from paddle_serving_server_gpu.pipeline import Op
class WebService(object): class WebService(object):
def __init__(self, name="default_service"): def __init__(self, name="default_service"):
self.name = name self.name = name
self.gpus = [] # pipeline
self.rpc_service_list = [] self._server = pipeline.PipelineServer(self.name)
self.gpus = [] # deprecated
self.rpc_service_list = [] # deprecated
def get_pipeline_response(self, read_op):
return None
def prepare_pipeline_config(self, yaml_file):
# build dag
read_op = pipeline.RequestOp()
last_op = self.get_pipeline_response(read_op)
if not isinstance(last_op, Op):
raise ValueError("The return value type of `get_pipeline_response` "
"function is not Op type, please check function "
"`get_pipeline_response`.")
response_op = pipeline.ResponseOp(input_ops=[last_op])
self._server.set_response_op(response_op)
self._server.prepare_server(yaml_file)
def run_service(self):
self._server.run_server()
def load_model_config(self, model_config): def load_model_config(self, model_config):
print("This API will be deprecated later. Please do not use it")
self.model_config = model_config self.model_config = model_config
def set_gpus(self, gpus): def set_gpus(self, gpus):
print("This API will be deprecated later. Please do not use it")
self.gpus = [int(x) for x in gpus.split(",")] self.gpus = [int(x) for x in gpus.split(",")]
def default_rpc_service(self, def default_rpc_service(self,
...@@ -88,6 +114,7 @@ class WebService(object): ...@@ -88,6 +114,7 @@ class WebService(object):
gpuid=0, gpuid=0,
mem_optim=True, mem_optim=True,
ir_optim=False): ir_optim=False):
print("This API will be deprecated later. Please do not use it")
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
...@@ -155,6 +182,7 @@ class WebService(object): ...@@ -155,6 +182,7 @@ class WebService(object):
return result return result
def run_rpc_service(self): def run_rpc_service(self):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -183,6 +211,7 @@ class WebService(object): ...@@ -183,6 +211,7 @@ class WebService(object):
# TODO: maybe change another API name: maybe run_local_predictor? # TODO: maybe change another API name: maybe run_local_predictor?
def run_debugger_service(self, gpu=False): def run_debugger_service(self, gpu=False):
print("This API will be deprecated later. Please do not use it")
import socket import socket
localIP = socket.gethostbyname(socket.gethostname()) localIP = socket.gethostbyname(socket.gethostname())
print("web service address:") print("web service address:")
...@@ -209,18 +238,21 @@ class WebService(object): ...@@ -209,18 +238,21 @@ class WebService(object):
"{}".format(self.model_config), gpu=gpu, profile=False) "{}".format(self.model_config), gpu=gpu, profile=False)
def run_web_service(self): def run_web_service(self):
print("This API will be deprecated later. Please do not use it")
self.app_instance.run(host="0.0.0.0", self.app_instance.run(host="0.0.0.0",
port=self.port, port=self.port,
threaded=False, threaded=False,
processes=1) processes=4)
def get_app_instance(self): def get_app_instance(self):
return self.app_instance return self.app_instance
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
print("This API will be deprecated later. Please do not use it")
return feed, fetch return feed, fetch
def postprocess(self, feed=[], fetch=[], fetch_map=None): def postprocess(self, feed=[], fetch=[], fetch_map=None):
for key in fetch_map.iterkeys(): print("This API will be deprecated later. Please do not use it")
for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist() fetch_map[key] = fetch_map[key].tolist()
return fetch_map return fetch_map
...@@ -11,8 +11,9 @@ ...@@ -11,8 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import from . import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp from .operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from .pipeline_server import PipelineServer
from pipeline_client import PipelineClient from .pipeline_client import PipelineClient
from analyse import Analyst from .local_rpc_service_handler import LocalRpcServiceHandler
from .analyse import Analyst
...@@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum): ...@@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum):
RPC_PACKAGE_ERROR = 4 RPC_PACKAGE_ERROR = 4
CLIENT_ERROR = 5 CLIENT_ERROR = 5
CLOSED_ERROR = 6 CLOSED_ERROR = 6
UNKNOW = 7 NO_SERVICE = 7
UNKNOW = 8
class ChannelDataType(enum.Enum): class ChannelDataType(enum.Enum):
......
...@@ -299,13 +299,12 @@ class DAGExecutor(object): ...@@ -299,13 +299,12 @@ class DAGExecutor(object):
sys.stderr.write(profile_str) sys.stderr.write(profile_str)
# add profile info into rpc_resp # add profile info into rpc_resp
profile_value = ""
if resp_channeldata.client_need_profile: if resp_channeldata.client_need_profile:
profile_set = resp_channeldata.profile_data_set profile_set = resp_channeldata.profile_data_set
profile_set.add(profile_str) profile_set.add(profile_str)
profile_value = "".join(list(profile_set)) profile_value = "".join(list(profile_set))
rpc_resp.key.append(self._client_profile_key) rpc_resp.key.append(self._client_profile_key)
rpc_resp.value.append(profile_value) rpc_resp.value.append(profile_value)
return rpc_resp return rpc_resp
...@@ -338,7 +337,8 @@ class DAG(object): ...@@ -338,7 +337,8 @@ class DAG(object):
self._manager = PipelineProcSyncManager() self._manager = PipelineProcSyncManager()
_LOGGER.info("[DAG] Succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): @staticmethod
def get_use_ops(response_op):
unique_names = set() unique_names = set()
used_ops = set() used_ops = set()
succ_ops_of_use_op = {} # {op_name: succ_ops} succ_ops_of_use_op = {} # {op_name: succ_ops}
...@@ -427,11 +427,11 @@ class DAG(object): ...@@ -427,11 +427,11 @@ class DAG(object):
_LOGGER.critical("Failed to build DAG: ResponseOp" _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.") " has not been set.")
os._exit(-1) os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op) used_ops, out_degree_ops = DAG.get_use_ops(response_op)
if not self._build_dag_each_worker: if not self._build_dag_each_worker:
_LOGGER.info("================= USED OP =================") _LOGGER.info("================= USED OP =================")
for op in used_ops: for op in used_ops:
if op.name != self._request_name: if not isinstance(op, RequestOp):
_LOGGER.info(op.name) _LOGGER.info(op.name)
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1: if len(used_ops) <= 1:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package baidu.paddle_serving.pipeline_serving;
option go_package = ".;pipeline_serving";
import "google/api/annotations.proto";
message Response {
repeated string key = 1;
repeated string value = 2;
int32 ecode = 3;
string error_info = 4;
};
message Request {
repeated string key = 1;
repeated string value = 2;
string name = 3;
}
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/prediction"
body : "*"
};
}
};
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"C"
"flag"
"net/http"
"log"
"strconv"
"golang.org/x/net/context"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
gw "./proto"
)
//export run_proxy_server
func run_proxy_server(grpc_port int, http_port int) error {
var (
pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:" + strconv.Itoa(grpc_port), "endpoint of PipelineService")
)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterPipelineServiceHandlerFromEndpoint(ctx, mux, *pipelineEndpoint, opts)
if err != nil {
return err
}
log.Println("start proxy service")
return http.ListenAndServe(":" + strconv.Itoa(http_port), mux) // proxy port
}
func main() {}
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import multiprocessing
try:
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
PACKAGE_VERSION = "GPU"
except ImportError:
from paddle_serving_server import OpMaker, OpSeqMaker, Server
PACKAGE_VERSION = "CPU"
from . import util
_LOGGER = logging.getLogger(__name__)
_workdir_name_gen = util.NameGenerator("workdir_")
class LocalRpcServiceHandler(object):
def __init__(self,
model_config,
workdir="",
thread_num=2,
devices="",
mem_optim=True,
ir_optim=False,
available_port_generator=None):
if available_port_generator is None:
available_port_generator = util.GetAvailablePortGenerator()
self._model_config = model_config
self._port_list = []
if devices == "":
# cpu
devices = [-1]
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in cpu device. Port({})"
.format(model_config, self._port_list))
else:
# gpu
if PACKAGE_VERSION == "CPU":
raise ValueError(
"You are using the CPU version package("
"paddle-serving-server), unable to set devices")
devices = [int(x) for x in devices.split(",")]
for _ in devices:
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})"
.format(model_config, devices, self._port_list))
self._workdir = workdir
self._devices = devices
self._thread_num = thread_num
self._mem_optim = mem_optim
self._ir_optim = ir_optim
self._rpc_service_list = []
self._server_pros = []
self._fetch_vars = None
def get_fetch_list(self):
return self._fetch_vars
def get_port_list(self):
return self._port_list
def get_client_config(self):
return os.path.join(self._model_config, "serving_server_conf.prototxt")
def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
ir_optim):
device = "gpu"
if gpuid == -1:
device = "cpu"
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_ir_optimize(ir_optim)
server.load_model_config(self._model_config)
if gpuid >= 0:
server.set_gpuid(gpuid)
server.prepare_server(workdir=workdir, port=port, device=device)
if self._fetch_vars is None:
self._fetch_vars = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
self._rpc_service_list[service_idx].run_server()
def prepare_server(self):
for i, device_id in enumerate(self._devices):
if self._workdir != "":
workdir = "{}_{}".format(self._workdir, i)
else:
workdir = _workdir_name_gen.next()
self._rpc_service_list.append(
self._prepare_one_server(
workdir,
self._port_list[i],
device_id,
thread_num=self._thread_num,
mem_optim=self._mem_optim,
ir_optim=self._ir_optim))
def start_server(self):
for i, service in enumerate(self._rpc_service_list):
p = multiprocessing.Process(
target=self._start_one_server, args=(i, ))
p.daemon = True
self._server_pros.append(p)
for p in self._server_pros:
p.start()
...@@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
ChannelTimeoutError) ChannelTimeoutError)
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
from . import local_rpc_service_handler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -47,46 +48,128 @@ class Op(object): ...@@ -47,46 +48,128 @@ class Op(object):
def __init__(self, def __init__(self,
name=None, name=None,
input_ops=[], input_ops=[],
server_endpoints=[], server_endpoints=None,
fetch_list=[], fetch_list=None,
client_config=None, client_config=None,
concurrency=1, concurrency=None,
timeout=-1, timeout=None,
retry=1, retry=None,
batch_size=1, batch_size=None,
auto_batching_timeout=None): auto_batching_timeout=None,
local_rpc_service_handler=None):
# In __init__, all the parameters are just saved and Op is not initialized
if name is None: if name is None:
name = _op_name_gen.next() name = _op_name_gen.next()
self.name = name # to identify the type of OP, it must be globally unique self.name = name # to identify the type of OP, it must be globally unique
self.concurrency = concurrency # amount of concurrency self.concurrency = concurrency # amount of concurrency
self.set_input_ops(input_ops) self.set_input_ops(input_ops)
self._local_rpc_service_handler = local_rpc_service_handler
self._server_endpoints = server_endpoints self._server_endpoints = server_endpoints
self.with_serving = False
if len(self._server_endpoints) != 0:
self.with_serving = True
self._client_config = client_config
self._fetch_names = fetch_list self._fetch_names = fetch_list
self._client_config = client_config
if timeout > 0: self._timeout = timeout
self._timeout = timeout / 1000.0
else:
self._timeout = -1
self._retry = max(1, retry) self._retry = max(1, retry)
self._batch_size = batch_size
self._auto_batching_timeout = auto_batching_timeout
self._input = None self._input = None
self._outputs = [] self._outputs = []
self._batch_size = batch_size self._server_use_profile = False
self._auto_batching_timeout = auto_batching_timeout self._tracer = None
if self._auto_batching_timeout is not None:
if self._auto_batching_timeout <= 0 or self._batch_size == 1: # only for thread op
_LOGGER.warning( self._for_init_op_lock = threading.Lock()
self._log( self._for_close_op_lock = threading.Lock()
"Because auto_batching_timeout <= 0 or batch_size == 1," self._succ_init_op = False
" set auto_batching_timeout to None.")) self._succ_close_op = False
self._auto_batching_timeout = None
def init_from_dict(self, conf):
# init op
if self.concurrency is None:
self.concurrency = conf["concurrency"]
if self._retry is None:
self._retry = conf["retry"]
if self._fetch_names is None:
self._fetch_names = conf.get("fetch_list")
if self._client_config is None:
self._client_config = conf.get("client_config")
if self._timeout is None:
self._timeout = conf["timeout"]
if self._timeout > 0:
self._timeout = self._timeout / 1000.0
else:
self._timeout = -1
if self._batch_size is None:
self._batch_size = conf["batch_size"]
if self._auto_batching_timeout is None:
self._auto_batching_timeout = conf["auto_batching_timeout"]
if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning(
self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None."))
self._auto_batching_timeout = None
else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
if self._server_endpoints is None:
server_endpoints = conf.get("server_endpoints", [])
if len(server_endpoints) != 0:
# remote service
self.with_serving = True
self._server_endpoints = server_endpoints
else: else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 if self._local_rpc_service_handler is None:
local_service_conf = conf.get("local_service_conf")
_LOGGER.info("local_service_conf: {}".format(
local_service_conf))
model_config = local_service_conf.get("model_config")
_LOGGER.info("model_config: {}".format(model_config))
if model_config is None:
self.with_serving = False
else:
# local rpc service
self.with_serving = True
service_handler = local_rpc_service_handler.LocalRpcServiceHandler(
model_config=model_config,
workdir=local_service_conf["workdir"],
thread_num=local_service_conf["thread_num"],
devices=local_service_conf["devices"],
mem_optim=local_service_conf["mem_optim"],
ir_optim=local_service_conf["ir_optim"])
service_handler.prepare_server() # get fetch_list
serivce_ports = service_handler.get_port_list()
self._server_endpoints = [
"127.0.0.1:{}".format(p) for p in serivce_ports
]
if self._client_config is None:
self._client_config = service_handler.get_client_config(
)
if self._fetch_names is None:
self._fetch_names = service_handler.get_fetch_list()
self._local_rpc_service_handler = service_handler
else:
self.with_serving = True
self._local_rpc_service_handler.prepare_server(
) # get fetch_list
serivce_ports = self._local_rpc_service_handler.get_port_list(
)
self._server_endpoints = [
"127.0.0.1:{}".format(p) for p in serivce_ports
]
if self._client_config is None:
self._client_config = self._local_rpc_service_handler.get_client_config(
)
if self._fetch_names is None:
self._fetch_names = self._local_rpc_service_handler.get_fetch_list(
)
else:
self.with_serving = True
if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp): if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp):
_LOGGER.info( _LOGGER.info(
self._log("\n\tinput_ops: {}," self._log("\n\tinput_ops: {},"
...@@ -98,20 +181,22 @@ class Op(object): ...@@ -98,20 +181,22 @@ class Op(object):
"\n\tretry: {}," "\n\tretry: {},"
"\n\tbatch_size: {}," "\n\tbatch_size: {},"
"\n\tauto_batching_timeout(s): {}".format( "\n\tauto_batching_timeout(s): {}".format(
", ".join([op.name for op in input_ops ", ".join([op.name for op in self._input_ops
]), self._server_endpoints, ]), self._server_endpoints,
self._fetch_names, self._client_config, self._fetch_names, self._client_config,
self.concurrency, self._timeout, self._retry, self.concurrency, self._timeout, self._retry,
self._batch_size, self._auto_batching_timeout))) self._batch_size, self._auto_batching_timeout)))
self._server_use_profile = False def launch_local_rpc_service(self):
self._tracer = None if self._local_rpc_service_handler is None:
_LOGGER.warning(
# only for thread op self._log("Failed to launch local rpc"
self._for_init_op_lock = threading.Lock() " service: local_rpc_service_handler is None."))
self._for_close_op_lock = threading.Lock() return
self._succ_init_op = False port = self._local_rpc_service_handler.get_port_list()
self._succ_close_op = False self._local_rpc_service_handler.start_server()
_LOGGER.info("Op({}) use local rpc service at port: {}"
.format(self.name, port))
def use_default_auto_batching_config(self): def use_default_auto_batching_config(self):
if self._batch_size != 1: if self._batch_size != 1:
...@@ -775,7 +860,9 @@ class RequestOp(Op): ...@@ -775,7 +860,9 @@ class RequestOp(Op):
for idx, key in enumerate(request.key): for idx, key in enumerate(request.key):
data = request.value[idx] data = request.value[idx]
try: try:
data = eval(data) evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e: except Exception as e:
pass pass
dictdata[key] = data dictdata[key] = data
......
...@@ -42,11 +42,12 @@ class PipelineClient(object): ...@@ -42,11 +42,12 @@ class PipelineClient(object):
def _pack_request_package(self, feed_dict, profile): def _pack_request_package(self, feed_dict, profile):
req = pipeline_service_pb2.Request() req = pipeline_service_pb2.Request()
np.set_printoptions(threshold=sys.maxsize)
for key, value in feed_dict.items(): for key, value in feed_dict.items():
req.key.append(key) req.key.append(key)
if isinstance(value, np.ndarray): if isinstance(value, np.ndarray):
req.value.append(value.__repr__()) req.value.append(value.__repr__())
elif isinstance(value, str): elif isinstance(value, (str, unicode)):
req.value.append(value) req.value.append(value)
elif isinstance(value, list): elif isinstance(value, list):
req.value.append(np.array(value).__repr__()) req.value.append(np.array(value).__repr__())
...@@ -75,7 +76,9 @@ class PipelineClient(object): ...@@ -75,7 +76,9 @@ class PipelineClient(object):
continue continue
data = resp.value[idx] data = resp.value[idx]
try: try:
data = eval(data) evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e: except Exception as e:
pass pass
fetch_map[key] = data fetch_map[key] = data
......
...@@ -22,22 +22,31 @@ from contextlib import closing ...@@ -22,22 +22,31 @@ from contextlib import closing
import multiprocessing import multiprocessing
import yaml import yaml
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from .operator import ResponseOp from . import operator
from .dag import DAGExecutor from . import dag
from . import util
from . import channel
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_conf, worker_idx=-1): def __init__(self, name, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
self._name = name
# init dag executor # init dag executor
self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context): def inference(self, request, context):
if request.name != "" and request.name != self._name:
resp = pipeline_service_pb2.Response()
resp.ecode = channel.ChannelDataEcode.NO_SERVICE.value
resp.error_info = "Failed to inference: Service name error."
return resp
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
return resp return resp
...@@ -57,35 +66,83 @@ def _reserve_port(port): ...@@ -57,35 +66,83 @@ def _reserve_port(port):
class PipelineServer(object): class PipelineServer(object):
def __init__(self): def __init__(self, name=None):
self._port = None self._name = name # for grpc-gateway path
self._rpc_port = None
self._worker_num = None self._worker_num = None
self._response_op = None self._response_op = None
self._proxy_server = None
def _grpc_gateway(self, grpc_port, http_port):
import os
from ctypes import cdll
from . import gateway
lib_path = os.path.join(
os.path.dirname(gateway.__file__), "libproxy_server.so")
proxy_server = cdll.LoadLibrary(lib_path)
proxy_server.run_proxy_server(grpc_port, http_port)
def _run_grpc_gateway(self, grpc_port, http_port):
if http_port <= 0:
_LOGGER.info("Ignore grpc_gateway configuration.")
return
if not util.AvailablePortGenerator.port_is_available(http_port):
raise SystemExit("Failed to run grpc-gateway: prot {} "
"is already used".format(http_port))
if self._proxy_server is not None:
raise RuntimeError("Proxy server has been started.")
self._proxy_server = multiprocessing.Process(
target=self._grpc_gateway, args=(
grpc_port,
http_port, ))
self._proxy_server.daemon = True
self._proxy_server.start()
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, operator.ResponseOp):
raise Exception("Failed to set response_op: response_op " raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.") "must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("Failed to set response_op: response_op " raise Exception("Failed to set response_op: response_op "
"can only have one previous op.") "can only have one previous op.")
self._response_op = response_op self._response_op = response_op
self._used_op, _ = dag.DAG.get_use_ops(self._response_op)
def prepare_server(self, yml_file=None, yml_dict=None):
conf = ServerYamlConfChecker.load_server_yaml_conf(
yml_file=yml_file, yml_dict=yml_dict)
self._rpc_port = conf.get("rpc_port")
self._http_port = conf.get("http_port")
if self._rpc_port is None:
if self._http_port is None:
raise SystemExit("Failed to prepare_server: rpc_port or "
"http_port can not be None.")
else:
# http mode: generate rpc_port
if not util.AvailablePortGenerator.port_is_available(
self._http_port):
raise SystemExit("Failed to prepare_server: http_port({}) "
"is already used".format(self._http_port))
self._rpc_port = util.GetAvailablePortGenerator().next()
else:
if not util.AvailablePortGenerator.port_is_available(
self._rpc_port):
raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._rpc_port))
if self._http_port is None:
# rpc mode
pass
else:
# http mode
if not util.AvailablePortGenerator.port_is_available(
self._http_port):
raise SystemExit("Failed to prepare_server: http_port({}) "
"is already used".format(self._http_port))
def _port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
return result != 0
def prepare_server(self, yml_file):
conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file)
self._port = conf["port"]
if not self._port_is_available(self._port):
raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._port))
self._worker_num = conf["worker_num"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"] self._build_dag_each_worker = conf["build_dag_each_worker"]
self._init_ops(conf["op"])
_LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("============= PIPELINE SERVER =============")
_LOGGER.info("\n{}".format( _LOGGER.info("\n{}".format(
...@@ -98,10 +155,40 @@ class PipelineServer(object): ...@@ -98,10 +155,40 @@ class PipelineServer(object):
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
self._conf = conf self._conf = conf
self._start_local_rpc_service()
def _init_ops(self, op_conf):
default_conf = {
"concurrency": 1,
"timeout": -1,
"retry": 1,
"batch_size": 1,
"auto_batching_timeout": -1,
"local_service_conf": {
"workdir": "",
"thread_num": 2,
"devices": "",
"mem_optim": True,
"ir_optim": False,
},
}
for op in self._used_op:
if not isinstance(op, operator.RequestOp) and not isinstance(
op, operator.ResponseOp):
conf = op_conf.get(op.name, default_conf)
op.init_from_dict(conf)
def _start_local_rpc_service(self):
# only brpc now
if self._conf["dag"]["client_type"] != "brpc":
_LOGGER.warning("Local service version must be brpc type now.")
for op in self._used_op:
if not isinstance(op, operator.RequestOp):
op.launch_local_rpc_service()
def run_server(self): def run_server(self):
if self._build_dag_each_worker: if self._build_dag_each_worker:
with _reserve_port(self._port) as port: with _reserve_port(self._rpc_port) as port:
bind_address = 'localhost:{}'.format(port) bind_address = 'localhost:{}'.format(port)
workers = [] workers = []
for i in range(self._worker_num): for i in range(self._worker_num):
...@@ -111,6 +198,9 @@ class PipelineServer(object): ...@@ -111,6 +198,9 @@ class PipelineServer(object):
args=(bind_address, self._response_op, self._conf, i)) args=(bind_address, self._response_op, self._conf, i))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
self._run_grpc_gateway(
grpc_port=self._rpc_port,
http_port=self._http_port) # start grpc_gateway
for worker in workers: for worker in workers:
worker.join() worker.join()
else: else:
...@@ -120,9 +210,13 @@ class PipelineServer(object): ...@@ -120,9 +210,13 @@ class PipelineServer(object):
('grpc.max_receive_message_length', 256 * 1024 * 1024) ('grpc.max_receive_message_length', 256 * 1024 * 1024)
]) ])
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._conf), server) PipelineServicer(self._name, self._response_op, self._conf),
server.add_insecure_port('[::]:{}'.format(self._port)) server)
server.add_insecure_port('[::]:{}'.format(self._rpc_port))
server.start() server.start()
self._run_grpc_gateway(
grpc_port=self._rpc_port,
http_port=self._http_port) # start grpc_gateway
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
...@@ -133,7 +227,8 @@ class PipelineServer(object): ...@@ -133,7 +227,8 @@ class PipelineServer(object):
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_conf, worker_idx), server) PipelineServicer(self._name, response_op, dag_conf, worker_idx),
server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
...@@ -144,12 +239,25 @@ class ServerYamlConfChecker(object): ...@@ -144,12 +239,25 @@ class ServerYamlConfChecker(object):
pass pass
@staticmethod @staticmethod
def load_server_yaml_conf(yml_file): def load_server_yaml_conf(yml_file=None, yml_dict=None):
with open(yml_file) as f: if yml_file is not None and yml_dict is not None:
conf = yaml.load(f.read()) raise SystemExit("Failed to prepare_server: only one of yml_file"
" or yml_dict can be selected as the parameter.")
if yml_file is not None:
with open(yml_file) as f:
conf = yaml.load(f.read())
elif yml_dict is not None:
conf = yml_dict
else:
raise SystemExit("Failed to prepare_server: yml_file or yml_dict"
" can not be None.")
ServerYamlConfChecker.check_server_conf(conf) ServerYamlConfChecker.check_server_conf(conf)
ServerYamlConfChecker.check_dag_conf(conf["dag"]) ServerYamlConfChecker.check_dag_conf(conf["dag"])
ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"]) ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"])
for op_name in conf["op"]:
ServerYamlConfChecker.check_op_conf(conf["op"][op_name])
ServerYamlConfChecker.check_local_service_conf(conf["op"][op_name][
"local_service_conf"])
return conf return conf
@staticmethod @staticmethod
...@@ -161,26 +269,80 @@ class ServerYamlConfChecker(object): ...@@ -161,26 +269,80 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_server_conf(conf): def check_server_conf(conf):
default_conf = { default_conf = {
"port": 9292, # "rpc_port": 9292,
"worker_num": 1, "worker_num": 1,
"build_dag_each_worker": False, "build_dag_each_worker": False,
#"http_port": 0,
"dag": {}, "dag": {},
"op": {},
} }
conf_type = { conf_type = {
"port": int, "rpc_port": int,
"http_port": int,
"worker_num": int, "worker_num": int,
"build_dag_each_worker": bool, "build_dag_each_worker": bool,
"grpc_gateway_port": int,
} }
conf_qualification = { conf_qualification = {
"port": [(">=", 1024), ("<=", 65535)], "rpc_port": [(">=", 1024), ("<=", 65535)],
"http_port": [(">=", 1024), ("<=", 65535)],
"worker_num": (">=", 1), "worker_num": (">=", 1),
} }
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification) conf_qualification)
@staticmethod
def check_local_service_conf(conf):
default_conf = {
"workdir": "",
"thread_num": 2,
"devices": "",
"mem_optim": True,
"ir_optim": False,
}
conf_type = {
"model_config": str,
"workdir": str,
"thread_num": int,
"devices": str,
"mem_optim": bool,
"ir_optim": bool,
}
conf_qualification = {"thread_num": (">=", 1), }
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_op_conf(conf):
default_conf = {
"concurrency": 1,
"timeout": -1,
"retry": 1,
"batch_size": 1,
"auto_batching_timeout": -1,
"local_service_conf": {},
}
conf_type = {
"server_endpoints": list,
"fetch_list": list,
"client_config": str,
"concurrency": int,
"timeout": int,
"retry": int,
"batch_size": int,
"auto_batching_timeout": int,
}
conf_qualification = {
"concurrency": (">=", 1),
"retry": (">=", 1),
"batch_size": (">=", 1),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod @staticmethod
def check_tracer_conf(conf): def check_tracer_conf(conf):
default_conf = {"interval_s": -1, } default_conf = {"interval_s": -1, }
...@@ -231,6 +393,8 @@ class ServerYamlConfChecker(object): ...@@ -231,6 +393,8 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_conf_type(conf, conf_type): def check_conf_type(conf, conf_type):
for key, val in conf_type.items(): for key, val in conf_type.items():
if key not in conf:
continue
if not isinstance(conf[key], val): if not isinstance(conf[key], val):
raise SystemExit("[CONF] {} must be {} type, but get {}." raise SystemExit("[CONF] {} must be {} type, but get {}."
.format(key, val, type(conf[key]))) .format(key, val, type(conf[key])))
...@@ -238,6 +402,8 @@ class ServerYamlConfChecker(object): ...@@ -238,6 +402,8 @@ class ServerYamlConfChecker(object):
@staticmethod @staticmethod
def check_conf_qualification(conf, conf_qualification): def check_conf_qualification(conf, conf_qualification):
for key, qualification in conf_qualification.items(): for key, qualification in conf_qualification.items():
if key not in conf:
continue
if not isinstance(qualification, list): if not isinstance(qualification, list):
qualification = [qualification] qualification = [qualification]
if not ServerYamlConfChecker.qualification_check(conf[key], if not ServerYamlConfChecker.qualification_check(conf[key],
......
...@@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving; ...@@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving;
message Request { message Request {
repeated string key = 1; repeated string key = 1;
repeated string value = 2; repeated string value = 2;
optional string name = 3;
}; };
message Response { message Response {
......
...@@ -17,6 +17,8 @@ import logging ...@@ -17,6 +17,8 @@ import logging
import threading import threading
import multiprocessing import multiprocessing
import multiprocessing.managers import multiprocessing.managers
from contextlib import closing
import socket
if sys.version_info.major == 2: if sys.version_info.major == 2:
import Queue import Queue
from Queue import PriorityQueue from Queue import PriorityQueue
...@@ -29,6 +31,34 @@ else: ...@@ -29,6 +31,34 @@ else:
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class AvailablePortGenerator(object):
def __init__(self, start_port=12000):
self._curr_port = start_port
@staticmethod
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def next(self):
while not AvailablePortGenerator.port_is_available(self._curr_port):
self._curr_port += 1
self._curr_port += 1
return self._curr_port - 1
_AvailablePortGenerator = AvailablePortGenerator()
def GetAvailablePortGenerator():
return _AvailablePortGenerator
class NameGenerator(object): class NameGenerator(object):
# use unsafe-id-generator # use unsafe-id-generator
def __init__(self, prefix): def __init__(self, prefix):
......
...@@ -16,7 +16,6 @@ from __future__ import absolute_import ...@@ -16,7 +16,6 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
import os import os
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
...@@ -24,18 +23,9 @@ from setuptools import find_packages ...@@ -24,18 +23,9 @@ from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_app.version import serving_app_version from paddle_serving_app.version import serving_app_version
from pkg_resources import DistributionNotFound, get_distribution from pkg_resources import DistributionNotFound, get_distribution
import util
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
max_version, mid_version, min_version = python_version()
if '${PACK}' == 'ON': if '${PACK}' == 'ON':
copy_lib() copy_lib()
......
...@@ -16,7 +16,6 @@ from __future__ import absolute_import ...@@ -16,7 +16,6 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
import os import os
import sys import sys
...@@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension ...@@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_client.version import serving_client_version from paddle_serving_client.version import serving_client_version
from pkg_resources import DistributionNotFound, get_distribution import util
py_version = sys.version_info py_version = sys.version_info
def python_version():
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def copy_lib(): def copy_lib():
if py_version[0] == 2: if py_version[0] == 2:
lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10'] lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10']
...@@ -51,18 +40,20 @@ def copy_lib(): ...@@ -51,18 +40,20 @@ def copy_lib():
text = r.read() text = r.read()
os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1])) os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1]))
max_version, mid_version, min_version = python_version() max_version, mid_version, min_version = util.python_version()
# gen pipeline proto code
util.gen_pipeline_code("paddle_serving_client")
if '${PACK}' == 'ON': if '${PACK}' == 'ON':
copy_lib() copy_lib()
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1',
'grpcio-tools >= 1.28.1' 'grpcio-tools >= 1.28.1'
] ]
if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"): if not util.find_package("paddlepaddle") and not util.find_package("paddlepaddle-gpu"):
REQUIRED_PACKAGES.append("paddlepaddle") REQUIRED_PACKAGES.append("paddlepaddle")
...@@ -72,8 +63,10 @@ packages=['paddle_serving_client', ...@@ -72,8 +63,10 @@ packages=['paddle_serving_client',
'paddle_serving_client.metric', 'paddle_serving_client.metric',
'paddle_serving_client.utils', 'paddle_serving_client.utils',
'paddle_serving_client.pipeline', 'paddle_serving_client.pipeline',
'paddle_serving_client.pipeline.proto'] 'paddle_serving_client.pipeline.proto',
package_data={'paddle_serving_client': ['serving_client.so','lib/*'],} 'paddle_serving_client.pipeline.gateway',
'paddle_serving_client.pipeline.gateway.proto']
package_data={'paddle_serving_client': ['serving_client.so', 'lib/*', 'pipeline/gateway/libproxy_server.so'],}
package_dir={'paddle_serving_client': package_dir={'paddle_serving_client':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client',
'paddle_serving_client.proto': 'paddle_serving_client.proto':
...@@ -87,7 +80,11 @@ package_dir={'paddle_serving_client': ...@@ -87,7 +80,11 @@ package_dir={'paddle_serving_client':
'paddle_serving_client.pipeline': 'paddle_serving_client.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline',
'paddle_serving_client.pipeline.proto': 'paddle_serving_client.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto',
'paddle_serving_client.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway',
'paddle_serving_client.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway/proto'}
setup( setup(
name='paddle-serving-client', name='paddle-serving-client',
......
...@@ -16,17 +16,14 @@ from __future__ import absolute_import ...@@ -16,17 +16,14 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving.version import serving_client_version from paddle_serving.version import serving_client_version
from grpc_tools import protoc
import util
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version()
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle' 'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle'
......
...@@ -16,25 +16,16 @@ from __future__ import absolute_import ...@@ -16,25 +16,16 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_server.version import serving_server_version from paddle_serving_server.version import serving_server_version
from pkg_resources import DistributionNotFound, get_distribution import util
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version() # gen pipeline proto code
util.gen_pipeline_code("paddle_serving_server")
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ ...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_server', packages=['paddle_serving_server',
'paddle_serving_server.proto', 'paddle_serving_server.proto',
'paddle_serving_server.pipeline', 'paddle_serving_server.pipeline',
'paddle_serving_server.pipeline.proto'] 'paddle_serving_server.pipeline.proto',
'paddle_serving_server.pipeline.gateway',
'paddle_serving_server.pipeline.gateway.proto']
package_dir={'paddle_serving_server': package_dir={'paddle_serving_server':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server',
...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server': ...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server':
'paddle_serving_server.pipeline': 'paddle_serving_server.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline',
'paddle_serving_server.pipeline.proto': 'paddle_serving_server.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto',
'paddle_serving_server.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway',
'paddle_serving_server.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway/proto'}
package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so'],}
setup( setup(
name='paddle-serving-server', name='paddle-serving-server',
...@@ -65,6 +64,7 @@ setup( ...@@ -65,6 +64,7 @@ setup(
author_email='guru4elephant@gmail.com', author_email='guru4elephant@gmail.com',
install_requires=REQUIRED_PACKAGES, install_requires=REQUIRED_PACKAGES,
packages=packages, packages=packages,
package_data=package_data,
package_dir=package_dir, package_dir=package_dir,
# PyPI package information. # PyPI package information.
classifiers=[ classifiers=[
......
...@@ -16,25 +16,16 @@ from __future__ import absolute_import ...@@ -16,25 +16,16 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import platform
from setuptools import setup, Distribution, Extension from setuptools import setup, Distribution, Extension
from setuptools import find_packages from setuptools import find_packages
from setuptools import setup from setuptools import setup
from paddle_serving_server_gpu.version import serving_server_version from paddle_serving_server_gpu.version import serving_server_version
from pkg_resources import DistributionNotFound, get_distribution import util
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def python_version(): max_version, mid_version, min_version = util.python_version()
return [int(v) for v in platform.python_version().split(".")]
max_version, mid_version, min_version = python_version() # gen pipeline proto code
util.gen_pipeline_code("paddle_serving_server_gpu")
REQUIRED_PACKAGES = [ REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1',
...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ ...@@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_server_gpu', packages=['paddle_serving_server_gpu',
'paddle_serving_server_gpu.proto', 'paddle_serving_server_gpu.proto',
'paddle_serving_server_gpu.pipeline', 'paddle_serving_server_gpu.pipeline',
'paddle_serving_server_gpu.pipeline.proto'] 'paddle_serving_server_gpu.pipeline.proto',
'paddle_serving_server_gpu.pipeline.gateway',
'paddle_serving_server_gpu.pipeline.gateway.proto']
package_dir={'paddle_serving_server_gpu': package_dir={'paddle_serving_server_gpu':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu',
...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu': ...@@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu':
'paddle_serving_server_gpu.pipeline': 'paddle_serving_server_gpu.pipeline':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline',
'paddle_serving_server_gpu.pipeline.proto': 'paddle_serving_server_gpu.pipeline.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto'} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto',
'paddle_serving_server_gpu.pipeline.gateway':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway',
'paddle_serving_server_gpu.pipeline.gateway.proto':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway/proto'}
package_data={'paddle_serving_server_gpu': ['pipeline/gateway/libproxy_server.so'],}
setup( setup(
name='paddle-serving-server-gpu', name='paddle-serving-server-gpu',
...@@ -65,6 +64,7 @@ setup( ...@@ -65,6 +64,7 @@ setup(
author_email='guru4elephant@gmail.com', author_email='guru4elephant@gmail.com',
install_requires=REQUIRED_PACKAGES, install_requires=REQUIRED_PACKAGES,
packages=packages, packages=packages,
package_data=package_data,
package_dir=package_dir, package_dir=package_dir,
# PyPI package information. # PyPI package information.
classifiers=[ classifiers=[
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pkg_resources import DistributionNotFound, get_distribution
from grpc_tools import protoc
import os
import platform
def python_version():
return [int(v) for v in platform.python_version().split(".")]
def find_package(pkgname):
try:
get_distribution(pkgname)
return True
except DistributionNotFound:
return False
def gen_pipeline_code(package_name):
# pipeline service proto
protoc.main((
'',
'-I.',
'--python_out=.',
'--grpc_python_out=.',
'{}/pipeline/proto/pipeline_service.proto'.format(package_name), ))
# pipeline grpc-gateway proto
# *.pb.go
ret = os.system(
"cd {}/pipeline/gateway/proto/ && "
"../../../../../third_party/install/protobuf/bin/protoc -I. "
"-I$GOPATH/src "
"-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis "
"--go_out=plugins=grpc:. "
"gateway.proto".format(package_name))
if ret != 0:
exit(1)
# *.gw.go
ret = os.system(
"cd {}/pipeline/gateway/proto/ && "
"../../../../../third_party/install/protobuf/bin/protoc -I. "
"-I$GOPATH/src "
"-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis "
"--grpc-gateway_out=logtostderr=true:. "
"gateway.proto".format(package_name))
if ret != 0:
exit(1)
# pipeline grpc-gateway shared-lib
ret = os.system(
"cd {}/pipeline/gateway && "
"go build -buildmode=c-shared -o libproxy_server.so proxy_server.go".
format(package_name))
if ret != 0:
exit(1)
...@@ -19,6 +19,13 @@ function init() { ...@@ -19,6 +19,13 @@ function init() {
cd Serving cd Serving
export SERVING_WORKDIR=$PWD export SERVING_WORKDIR=$PWD
$PYTHONROOT/bin/python -m pip install -r python/requirements.txt $PYTHONROOT/bin/python -m pip install -r python/requirements.txt
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u google.golang.org/grpc
} }
function check_cmd() { function check_cmd() {
...@@ -298,7 +305,6 @@ function python_test_bert() { ...@@ -298,7 +305,6 @@ function python_test_bert() {
cd bert # pwd: /Serving/python/examples/bert cd bert # pwd: /Serving/python/examples/bert
case $TYPE in case $TYPE in
CPU) CPU)
pip install paddlehub
# Because download from paddlehub may timeout, # Because download from paddlehub may timeout,
# download the model from bos(max_seq_len=128). # download the model from bos(max_seq_len=128).
wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz
...@@ -306,14 +312,12 @@ function python_test_bert() { ...@@ -306,14 +312,12 @@ function python_test_bert() {
sh get_data.sh sh get_data.sh
check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &" check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &"
sleep 5 sleep 5
pip install paddle_serving_app
check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt"
kill_server_process kill_server_process
echo "bert RPC inference pass" echo "bert RPC inference pass"
;; ;;
GPU) GPU)
export CUDA_VISIBLE_DEVICES=0 export CUDA_VISIBLE_DEVICES=0
pip install paddlehub
# Because download from paddlehub may timeout, # Because download from paddlehub may timeout,
# download the model from bos(max_seq_len=128). # download the model from bos(max_seq_len=128).
wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz
...@@ -321,7 +325,6 @@ function python_test_bert() { ...@@ -321,7 +325,6 @@ function python_test_bert() {
sh get_data.sh sh get_data.sh
check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &" check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &"
sleep 5 sleep 5
pip install paddle_serving_app
check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt"
kill_server_process kill_server_process
echo "bert RPC inference pass" echo "bert RPC inference pass"
...@@ -760,13 +763,14 @@ function python_test_resnet50(){ ...@@ -760,13 +763,14 @@ function python_test_resnet50(){
} }
function python_test_pipeline(){ function python_test_pipeline(){
# pwd:/ Serving/python/examples # pwd: /Serving/python/examples
local TYPE=$1 local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
unsetproxy unsetproxy
cd pipeline/imdb_model_ensemble cd pipeline # pwd: /Serving/python/examples/pipeline
case $TYPE in case $TYPE in
CPU) CPU)
cd imdb_model_ensemble # pwd: /Serving/python/examples/pipeline/imdb_model_ensemble
# start paddle serving service (brpc) # start paddle serving service (brpc)
sh get_data.sh sh get_data.sh
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log & python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log &
...@@ -775,7 +779,7 @@ function python_test_pipeline(){ ...@@ -775,7 +779,7 @@ function python_test_pipeline(){
# test: thread servicer & thread op # test: thread servicer & thread op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -792,7 +796,7 @@ EOF ...@@ -792,7 +796,7 @@ EOF
# test: thread servicer & process op # test: thread servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -809,7 +813,7 @@ EOF ...@@ -809,7 +813,7 @@ EOF
# test: process servicer & process op # test: process servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -828,7 +832,7 @@ EOF ...@@ -828,7 +832,7 @@ EOF
pip uninstall grpcio -y pip uninstall grpcio -y
pip install grpcio --no-binary=grpcio pip install grpcio --no-binary=grpcio
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: true build_dag_each_worker: true
dag: dag:
...@@ -852,7 +856,7 @@ EOF ...@@ -852,7 +856,7 @@ EOF
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log & python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log &
sleep 5 sleep 5
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
...@@ -869,16 +873,47 @@ EOF ...@@ -869,16 +873,47 @@ EOF
kill_server_process kill_server_process
kill_process_by_port 9292 kill_process_by_port 9292
kill_process_by_port 9393 kill_process_by_port 9393
cd ..
cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service.py >/dev/null &
sleep 5
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
check http code
http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction`
if [ ${http_code} -ne 200 ]; then
echo "HTTP status code -ne 200"
exit 1
fi
ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill
ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill
kill_server_process
cd ..
;; ;;
GPU) GPU)
echo "pipeline ignore GPU test" cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service.py >/dev/null &
sleep 5
curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}'
# check http code
http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction`
if [ ${http_code} -ne 200 ]; then
echo "HTTP status code -ne 200"
exit 1
fi
ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill
ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill
kill_server_process
cd .. # pwd: /Serving/python/examples/pipeline
;; ;;
*) *)
echo "error type" echo "error type"
exit 1 exit 1
;; ;;
esac esac
cd ../../ cd ..
setproxy setproxy
unset SERVING_BIN unset SERVING_BIN
} }
...@@ -928,118 +963,8 @@ function monitor_test() { ...@@ -928,118 +963,8 @@ function monitor_test() {
mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test
case $TYPE in case $TYPE in
CPU): CPU):
pip install pyftpdlib # The CPU part and GPU part are identical.
mkdir remote_path # In order to avoid Travis CI timeout (50 min), the CPU version is not checked
mkdir local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &"
cd .. # pwd: /Serving/_monitor_test
# type: ftp
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: ftp
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill
;; ;;
GPU): GPU):
pip install pyftpdlib pip install pyftpdlib
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册