diff --git a/doc/COMPILE.md b/doc/COMPILE.md index 63a4d67c40af77e616e018d1a6dbc289615b4971..466cef73a5f217cd2322fa5548c518a9004800c2 100644 --- a/doc/COMPILE.md +++ b/doc/COMPILE.md @@ -61,7 +61,22 @@ pip install -r python/requirements.txt If Python3 is used, replace `pip` with `pip3`. +## GOPATH Setting +The default GOPATH is `$HOME/go`, which you can set to other values. +```shell +export GOPATH=$HOME/go +export PATH=$PATH:$GOPATH/bin +``` + +## Get go packages + +```shell +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger +go get -u github.com/golang/protobuf/protoc-gen-go +go get -u google.golang.org/grpc +``` ## Compile Server diff --git a/doc/COMPILE_CN.md b/doc/COMPILE_CN.md index 29b0645cc4ea90c56cd5d691f4766a3e3ad39ba1..2ddaaf71f23b0199c7458d068139a6b7169c25d8 100644 --- a/doc/COMPILE_CN.md +++ b/doc/COMPILE_CN.md @@ -61,6 +61,22 @@ pip install -r python/requirements.txt 如果使用 Python3,请以 `pip3` 替换 `pip`。 +## GOPATH 设置 + +默认 GOPATH 设置为 `$HOME/go`,您也可以设置为其他值。 +```shell +export GOPATH=$HOME/go +export PATH=$PATH:$GOPATH/bin +``` + +## 获取 Go packages + +```shell +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger +go get -u github.com/golang/protobuf/protoc-gen-go +go get -u google.golang.org/grpc +``` ## 编译Server部分 diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index 7270cc134558906f6989a2c315a1dd4e2a640c59..4205aa15723d3625c0fea43eb9d0fd67f32f4a3f 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -251,9 +251,10 @@ server.run_server() Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: ```yaml -port: 18080 # gRPC port +rpc_port: 18080 # gRPC port worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 build_dag_each_worker: false # Whether to use process server or not. The default is false +http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0. dag: is_thread_op: true # Whether to use the thread version of OP. The default is true client_type: brpc # Use brpc or grpc client. The default is brpc @@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & ``` +PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`. + ### Start PipelineServer Run the following code @@ -384,7 +387,7 @@ for f in futures: -## How to optimize through the timeline tool +## How to optimize with the timeline tool In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service. diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index 3214487c31bcc47ec67d2ad28d987bff845fa13b..7cab409b2b8ca5d80eac05827f2e3fb774000998 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -249,9 +249,10 @@ server.run_server() 其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: ```yaml -port: 18080 # gRPC端口号 +rpc_port: 18080 # gRPC端口号 worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false +http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 dag: is_thread_op: true # 是否使用线程版Op,默认为 true client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc @@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & ``` +PipelineServing 也支持本地自动启动 PaddleServingService,请参考 `python/examples/pipeline/ocr` 下的例子。 + ### 启动 PipelineServer 运行下面代码 diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index edec41573b67f50feca52ee017bae2d7fa2b28ac..4d6b3ce35aac3bc288b869b23498a19269de3169 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -1,7 +1,5 @@ if (CLIENT) file(INSTALL pipeline DESTINATION paddle_serving_client) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_client/pipeline/proto) file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py) set(PY_FILES ${SERVING_CLIENT_PY_FILES}) SET(PACKAGE_NAME "serving_client") @@ -11,13 +9,9 @@ endif() if (SERVER) if (NOT WITH_GPU) file(INSTALL pipeline DESTINATION paddle_serving_server) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server/pipeline/proto) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) else() file(INSTALL pipeline DESTINATION paddle_serving_server_gpu) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server_gpu/pipeline/proto) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py) endif() set(PY_FILES ${SERVING_SERVER_PY_FILES}) @@ -25,6 +19,8 @@ if (SERVER) set(SETUP_LOG_FILE "setup.py.server.log") endif() +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py + ${CMAKE_CURRENT_BINARY_DIR}/util.py) if (CLIENT) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in ${CMAKE_CURRENT_BINARY_DIR}/setup.py) diff --git a/python/examples/pipeline/imdb_model_ensemble/README_CN.md b/python/examples/pipeline/imdb_model_ensemble/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..88eeab70c470268775ad22fd65a6d1b999a6b167 --- /dev/null +++ b/python/examples/pipeline/imdb_model_ensemble/README_CN.md @@ -0,0 +1,24 @@ +# IMDB model ensemble 样例 + +## 获取模型 +``` +sh get_data.sh +``` + +## 启动服务 + +``` +python -m paddle_serving_server_gpu.serve --model imdb_cnn_model --port 9292 &> cnn.log & +python -m paddle_serving_server_gpu.serve --model imdb_bow_model --port 9393 &> bow.log & +python test_pipeline_server.py &>pipeline.log & +``` + +## 启动客户端 +``` +python test_pipeline_client.py +``` + +## HTTP 测试 +``` +curl -X POST -k http://localhost:9999/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}' +``` diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index 3f0b1bb8d4eedb073fa5014eb20e1a170f0d811b..3447ffd449de59ea76450e95c7f355413d1a12ac 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -1,6 +1,7 @@ -port: 18080 +rpc_port: 18085 worker_num: 4 build_dag_each_worker: false +http_port: 9999 dag: is_thread_op: false client_type: brpc diff --git a/python/examples/pipeline/ocr/README.md b/python/examples/pipeline/ocr/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f51789fc5e419d715141ba59dc49011d4f306e56 --- /dev/null +++ b/python/examples/pipeline/ocr/README.md @@ -0,0 +1,67 @@ +# OCR Pipeline WebService + +(English|[简体中文](./README_CN.md)) + +This document will take OCR as an example to show how to use Pipeline WebService to start multi-model tandem services. + +## Get Model +``` +python -m paddle_serving_app.package --get_model ocr_rec +tar -xzvf ocr_rec.tar.gz +python -m paddle_serving_app.package --get_model ocr_det +tar -xzvf ocr_det.tar.gz +``` + +## Get Dataset (Optional) +``` +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar +tar xf test_imgs.tar +``` + +## Start Service +``` +python web_service.py &>log.txt & +``` + +## Test +``` +python pipeline_http_client.py +``` + + + + diff --git a/python/examples/pipeline/ocr/README_CN.md b/python/examples/pipeline/ocr/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..ba1150d32e16298d0c1267d46f7d6e804b53d041 --- /dev/null +++ b/python/examples/pipeline/ocr/README_CN.md @@ -0,0 +1,67 @@ +# OCR Pipeline WebService + +([English](./README.md)|简体中文) + +本文档将以 OCR 为例,介绍如何使用 Pipeline WebService 启动多模型串联的服务。 + +## 获取模型 +``` +python -m paddle_serving_app.package --get_model ocr_rec +tar -xzvf ocr_rec.tar.gz +python -m paddle_serving_app.package --get_model ocr_det +tar -xzvf ocr_det.tar.gz +``` + +## 获取数据集(可选) +``` +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar +tar xf test_imgs.tar +``` + +## 启动 WebService +``` +python web_service.py &>log.txt & +``` + +## 测试 +``` +python pipeline_http_client.py +``` + + diff --git a/python/examples/pipeline/ocr/config.yml b/python/examples/pipeline/ocr/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..48addccfd0e543e04adf6587c5532b2a18bb2810 --- /dev/null +++ b/python/examples/pipeline/ocr/config.yml @@ -0,0 +1,22 @@ +rpc_port: 18080 +worker_num: 4 +build_dag_each_worker: false +http_port: 9999 +dag: + is_thread_op: false + client_type: brpc + retry: 1 + use_profile: false +op: + det: + concurrency: 2 + local_service_conf: + model_config: ocr_det_model + devices: "0" + rec: + concurrency: 1 + timeout: -1 + retry: 1 + local_service_conf: + model_config: ocr_rec_model + devices: "0" diff --git a/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..1eea9c3b36f74d04c74618a2012810a1a58d411e --- /dev/null +++ b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py @@ -0,0 +1,135 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_det_model", + workdir="det_workdir", # defalut: "workdir" + thread_num=2, # defalut: 2 + devices="0", # gpu0. defalut: "" (cpu) + mem_optim=True, # defalut: True + ir_optim=False, # defalut: False + available_port_generator=None), # defalut: None + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + server_endpoints=["127.0.0.1:12001"], + fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"], + client_config="ocr_rec_client/serving_client_conf.prototxt", + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/imgs/1.jpg b/python/examples/pipeline/ocr/imgs/1.jpg new file mode 100644 index 0000000000000000000000000000000000000000..08010177fed2ee8c3709912c06c0b161ba546313 Binary files /dev/null and b/python/examples/pipeline/ocr/imgs/1.jpg differ diff --git a/python/examples/pipeline/ocr/local_service_pipeline_server.py b/python/examples/pipeline/ocr/local_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..ccbd3b1b07a30422583812b659e1c249b37bcb9e --- /dev/null +++ b/python/examples/pipeline/ocr/local_service_pipeline_server.py @@ -0,0 +1,134 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_det_model", + workdir="det_workdir", # defalut: "workdir" + thread_num=2, # defalut: 2 + devices="0", # gpu0. defalut: "" (cpu) + mem_optim=True, # defalut: True + ir_optim=False, # defalut: False + available_port_generator=None), # defalut: None + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_rec_model"), + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/pipeline_http_client.py b/python/examples/pipeline/ocr/pipeline_http_client.py new file mode 100644 index 0000000000000000000000000000000000000000..6d40e6474d6e0e32ac36835de3b69f4f90b6171d --- /dev/null +++ b/python/examples/pipeline/ocr/pipeline_http_client.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from paddle_serving_server_gpu.pipeline import PipelineClient +import numpy as np +import requests +import json +import cv2 +import base64 +import os + + +def cv2_to_base64(image): + return base64.b64encode(image).decode('utf8') + + +url = "http://127.0.0.1:9999/ocr/prediction" +test_img_dir = "imgs/" +for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data1 = file.read() + image = cv2_to_base64(image_data1) + +for i in range(4): + data = {"key": ["image"], "value": [image]} + r = requests.post(url=url, data=json.dumps(data)) + print(r.json()) diff --git a/python/examples/pipeline/ocr/pipeline_rpc_client.py b/python/examples/pipeline/ocr/pipeline_rpc_client.py new file mode 100644 index 0000000000000000000000000000000000000000..93524c36cb300e71bcde57f930cebc62e3d86cba --- /dev/null +++ b/python/examples/pipeline/ocr/pipeline_rpc_client.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from paddle_serving_server_gpu.pipeline import PipelineClient +import numpy as np +import requests +import json +import cv2 +import base64 +import os + +client = PipelineClient() +client.connect(['127.0.0.1:18080']) + + +def cv2_to_base64(image): + return base64.b64encode(image).decode('utf8') + + +test_img_dir = "imgs/" +for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data = file.read() + image = cv2_to_base64(image_data) + +for i in range(4): + ret = client.predict(feed_dict={"image": image}, fetch=["res"]) + print(ret) diff --git a/python/examples/pipeline/ocr/remote_service_pipeline_server.py b/python/examples/pipeline/ocr/remote_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..170e6dd9c4687e10bb4af6278f2f5b0c9ac09878 --- /dev/null +++ b/python/examples/pipeline/ocr/remote_service_pipeline_server.py @@ -0,0 +1,129 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + server_endpoints=["127.0.0.1:12000"], + fetch_list=["concat_1.tmp_0"], + client_config="ocr_det_client/serving_client_conf.prototxt", + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + server_endpoints=["127.0.0.1:12001"], + fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"], + client_config="ocr_rec_client/serving_client_conf.prototxt", + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py new file mode 100644 index 0000000000000000000000000000000000000000..d1e6ec808343d62cc7c85b2d78ac1caa57c8cf28 --- /dev/null +++ b/python/examples/pipeline/ocr/web_service.py @@ -0,0 +1,112 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np +import cv2 +import base64 +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +class OcrService(WebService): + def get_pipeline_response(self, read_op): + det_op = DetOp(name="det", input_ops=[read_op]) + rec_op = RecOp(name="rec", input_ops=[det_op]) + return rec_op + + +uci_service = OcrService(name="ocr") +uci_service.prepare_pipeline_config("config.yml") +uci_service.run_service() diff --git a/python/examples/pipeline/simple_web_service/README.md b/python/examples/pipeline/simple_web_service/README.md new file mode 100644 index 0000000000000000000000000000000000000000..049fbf2ec69bb83062f396e59344e29b0094372a --- /dev/null +++ b/python/examples/pipeline/simple_web_service/README.md @@ -0,0 +1,19 @@ +# Simple Pipeline WebService + +This document will takes UCI service as an example to introduce how to use Pipeline WebService. + +## Get model +``` +sh get_data.sh +``` + +## Start server + +``` +python web_service.py &>log.txt & +``` + +## Http test +``` +curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' +``` diff --git a/python/examples/pipeline/simple_web_service/README_CN.md b/python/examples/pipeline/simple_web_service/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..c08d642f7c8034e9d326a24636728bff36f8638b --- /dev/null +++ b/python/examples/pipeline/simple_web_service/README_CN.md @@ -0,0 +1,19 @@ +# Simple Pipeline WebService + +这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。 + +## 获取模型 +``` +sh get_data.sh +``` + +## 启动服务 + +``` +python web_service.py &>log.txt & +``` + +## 测试 +``` +curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' +``` diff --git a/python/examples/pipeline/simple_web_service/config.yml b/python/examples/pipeline/simple_web_service/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..72e473e320e792b8fafc46768c8ef38e7a00436c --- /dev/null +++ b/python/examples/pipeline/simple_web_service/config.yml @@ -0,0 +1,9 @@ +worker_num: 4 +http_port: 18080 +dag: + is_thread_op: false +op: + uci: + local_service_conf: + model_config: uci_housing_model + devices: "" # "0,1" diff --git a/python/examples/pipeline/simple_web_service/get_data.sh b/python/examples/pipeline/simple_web_service/get_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..84a3966a0ef323cef4b146d8e9489c70a7a8ae35 --- /dev/null +++ b/python/examples/pipeline/simple_web_service/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz +tar -xzf uci_housing.tar.gz diff --git a/python/examples/pipeline/simple_web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py new file mode 100644 index 0000000000000000000000000000000000000000..28197e804ffc08d094d0e33d3d2654ace3093ded --- /dev/null +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -0,0 +1,51 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np + +_LOGGER = logging.getLogger() + + +class UciOp(Op): + def init_op(self): + self.separator = "," + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + _LOGGER.info(input_dict) + x_value = input_dict["x"] + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) for x in x_value.split(self.separator)]) + return input_dict + + def postprocess(self, input_dicts, fetch_dict): + # _LOGGER.info(fetch_dict) + fetch_dict["price"] = str(fetch_dict["price"][0][0]) + return fetch_dict + + +class UciService(WebService): + def get_pipeline_response(self, read_op): + uci_op = UciOp(name="uci", input_ops=[read_op]) + return uci_op + + +uci_service = UciService(name="uci") +uci_service.prepare_pipeline_config("config.yml") +uci_service.run_service() diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 7427da7386b272c69dd082919c2b2c9175f78713..5cb8b0f95cc38869dea3f724ba89c3a8c994517c 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -162,6 +162,10 @@ class Server(object): self.container_id = None self.model_config_paths = None # for multi-model in a workflow + def get_fetch_list(self): + fetch_names = [var.alias_name for var in self.model_conf.fetch_var] + return fetch_names + def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py old mode 100755 new mode 100644 index 8deedb691e338eea382215b3b40b420ae20297c1..48b88200cdbf0135caf94f3c5dfcd99dc0d94209 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -21,6 +21,9 @@ from paddle_serving_client import Client from contextlib import closing import socket +from paddle_serving_server import pipeline +from paddle_serving_server.pipeline import Op + def port_is_available(port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: @@ -35,8 +38,29 @@ def port_is_available(port): class WebService(object): def __init__(self, name="default_service"): self.name = name + # pipeline + self._server = pipeline.PipelineServer(self.name) + + def get_pipeline_response(self, read_op): + return None + + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) + + def run_service(self): + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def _launch_rpc_service(self): @@ -73,6 +97,7 @@ class WebService(object): device="cpu", mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -112,6 +137,8 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") + import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") print("http://{}:{}/{}/prediction".format(localIP, self.port, @@ -160,6 +187,7 @@ class WebService(object): "{}".format(self.model_config), gpu=False, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, @@ -169,9 +197,11 @@ class WebService(object): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): + print("This API will be deprecated later. Please do not use it") for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index 50929dd6aa5e5c3e614e14024e0b5e1d734e11c1..39429e7825e1d32505f3156a813ebfa57547eb8f 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -214,6 +214,10 @@ class Server(object): self.product_name = None self.container_id = None + def get_fetch_list(self): + fetch_names = [var.alias_name for var in self.model_conf.fetch_var] + return fetch_names + def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index eafe7c4c2879e84a3e81dda57ba573dc58cc0095..96d34f4b4608fd488ff745b2aa9e8efa111901b1 100644 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -24,6 +24,9 @@ import sys import numpy as np import paddle_serving_server_gpu as serving +from paddle_serving_server_gpu import pipeline +from paddle_serving_server_gpu.pipeline import Op + def port_is_available(port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: @@ -38,13 +41,36 @@ def port_is_available(port): class WebService(object): def __init__(self, name="default_service"): self.name = name - self.gpus = [] - self.rpc_service_list = [] + # pipeline + self._server = pipeline.PipelineServer(self.name) + + self.gpus = [] # deprecated + self.rpc_service_list = [] # deprecated + + def get_pipeline_response(self, read_op): + return None + + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) + + def run_service(self): + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def set_gpus(self, gpus): + print("This API will be deprecated later. Please do not use it") self.gpus = [int(x) for x in gpus.split(",")] def default_rpc_service(self, @@ -98,6 +124,7 @@ class WebService(object): gpuid=0, mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -165,6 +192,7 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -193,6 +221,7 @@ class WebService(object): # TODO: maybe change another API name: maybe run_local_predictor? def run_debugger_service(self, gpu=False): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -219,18 +248,21 @@ class WebService(object): "{}".format(self.model_config), gpu=gpu, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, - processes=1) + processes=4) def get_app_instance(self): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): - for key in fetch_map.iterkeys(): + print("This API will be deprecated later. Please do not use it") + for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index 9f3056708c4394637ea6898fa50911af9871cd9d..7718016c9989a3b7348c3389c86495537786abb8 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logger # this module must be the first to import -from operator import Op, RequestOp, ResponseOp -from pipeline_server import PipelineServer -from pipeline_client import PipelineClient -from analyse import Analyst +from . import logger # this module must be the first to import +from .operator import Op, RequestOp, ResponseOp +from .pipeline_server import PipelineServer +from .pipeline_client import PipelineClient +from .local_rpc_service_handler import LocalRpcServiceHandler +from .analyse import Analyst diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 6f86658b262af79016c80172cac45d0dba15fe81..51aa0d4b4c33947d85a18f613f897129f85061fd 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum): RPC_PACKAGE_ERROR = 4 CLIENT_ERROR = 5 CLOSED_ERROR = 6 - UNKNOW = 7 + NO_SERVICE = 7 + UNKNOW = 8 class ChannelDataType(enum.Enum): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 0e058dbeab8be4741268cadad0ab887f03a7d8a7..272071f3211ed6029e5ba757da5ee2c780681ac2 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -299,13 +299,12 @@ class DAGExecutor(object): sys.stderr.write(profile_str) # add profile info into rpc_resp - profile_value = "" if resp_channeldata.client_need_profile: profile_set = resp_channeldata.profile_data_set profile_set.add(profile_str) profile_value = "".join(list(profile_set)) - rpc_resp.key.append(self._client_profile_key) - rpc_resp.value.append(profile_value) + rpc_resp.key.append(self._client_profile_key) + rpc_resp.value.append(profile_value) return rpc_resp @@ -338,7 +337,8 @@ class DAG(object): self._manager = PipelineProcSyncManager() _LOGGER.info("[DAG] Succ init") - def get_use_ops(self, response_op): + @staticmethod + def get_use_ops(response_op): unique_names = set() used_ops = set() succ_ops_of_use_op = {} # {op_name: succ_ops} @@ -427,11 +427,11 @@ class DAG(object): _LOGGER.critical("Failed to build DAG: ResponseOp" " has not been set.") os._exit(-1) - used_ops, out_degree_ops = self.get_use_ops(response_op) + used_ops, out_degree_ops = DAG.get_use_ops(response_op) if not self._build_dag_each_worker: _LOGGER.info("================= USED OP =================") for op in used_ops: - if op.name != self._request_name: + if not isinstance(op, RequestOp): _LOGGER.info(op.name) _LOGGER.info("-------------------------------------------") if len(used_ops) <= 1: diff --git a/python/pipeline/gateway/__init__.py b/python/pipeline/gateway/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..abf198b97e6e818e1fbe59006f98492640bcee54 --- /dev/null +++ b/python/pipeline/gateway/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/pipeline/gateway/proto/gateway.proto b/python/pipeline/gateway/proto/gateway.proto new file mode 100644 index 0000000000000000000000000000000000000000..9d3d501d06acf731231504a0ba97e89c72519ae4 --- /dev/null +++ b/python/pipeline/gateway/proto/gateway.proto @@ -0,0 +1,41 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package baidu.paddle_serving.pipeline_serving; +option go_package = ".;pipeline_serving"; + +import "google/api/annotations.proto"; + +message Response { + repeated string key = 1; + repeated string value = 2; + int32 ecode = 3; + string error_info = 4; +}; + +message Request { + repeated string key = 1; + repeated string value = 2; + string name = 3; +} + +service PipelineService { + rpc inference(Request) returns (Response) { + option (google.api.http) = { + post : "/{name=*}/prediction" + body : "*" + }; + } +}; diff --git a/python/pipeline/gateway/proxy_server.go b/python/pipeline/gateway/proxy_server.go new file mode 100644 index 0000000000000000000000000000000000000000..a74e798463b58efe26ab027c649a07131d4bbf32 --- /dev/null +++ b/python/pipeline/gateway/proxy_server.go @@ -0,0 +1,52 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "C" + "flag" + "net/http" + "log" + "strconv" + + "golang.org/x/net/context" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "google.golang.org/grpc" + + gw "./proto" +) + +//export run_proxy_server +func run_proxy_server(grpc_port int, http_port int) error { + var ( + pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:" + strconv.Itoa(grpc_port), "endpoint of PipelineService") + ) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mux := runtime.NewServeMux() + opts := []grpc.DialOption{grpc.WithInsecure()} + err := gw.RegisterPipelineServiceHandlerFromEndpoint(ctx, mux, *pipelineEndpoint, opts) + if err != nil { + return err + } + + log.Println("start proxy service") + return http.ListenAndServe(":" + strconv.Itoa(http_port), mux) // proxy port +} + +func main() {} diff --git a/python/pipeline/local_rpc_service_handler.py b/python/pipeline/local_rpc_service_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..376fcaf13af4e5a51ccf3ee6a1bd06a474a33bbd --- /dev/null +++ b/python/pipeline/local_rpc_service_handler.py @@ -0,0 +1,134 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import multiprocessing +try: + from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "GPU" +except ImportError: + from paddle_serving_server import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "CPU" +from . import util + +_LOGGER = logging.getLogger(__name__) +_workdir_name_gen = util.NameGenerator("workdir_") + + +class LocalRpcServiceHandler(object): + def __init__(self, + model_config, + workdir="", + thread_num=2, + devices="", + mem_optim=True, + ir_optim=False, + available_port_generator=None): + if available_port_generator is None: + available_port_generator = util.GetAvailablePortGenerator() + + self._model_config = model_config + self._port_list = [] + if devices == "": + # cpu + devices = [-1] + self._port_list.append(available_port_generator.next()) + _LOGGER.info("Model({}) will be launch in cpu device. Port({})" + .format(model_config, self._port_list)) + else: + # gpu + if PACKAGE_VERSION == "CPU": + raise ValueError( + "You are using the CPU version package(" + "paddle-serving-server), unable to set devices") + devices = [int(x) for x in devices.split(",")] + for _ in devices: + self._port_list.append(available_port_generator.next()) + _LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})" + .format(model_config, devices, self._port_list)) + self._workdir = workdir + self._devices = devices + self._thread_num = thread_num + self._mem_optim = mem_optim + self._ir_optim = ir_optim + + self._rpc_service_list = [] + self._server_pros = [] + self._fetch_vars = None + + def get_fetch_list(self): + return self._fetch_vars + + def get_port_list(self): + return self._port_list + + def get_client_config(self): + return os.path.join(self._model_config, "serving_server_conf.prototxt") + + def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim, + ir_optim): + device = "gpu" + if gpuid == -1: + device = "cpu" + op_maker = OpMaker() + read_op = op_maker.create('general_reader') + general_infer_op = op_maker.create('general_infer') + general_response_op = op_maker.create('general_response') + + op_seq_maker = OpSeqMaker() + op_seq_maker.add_op(read_op) + op_seq_maker.add_op(general_infer_op) + op_seq_maker.add_op(general_response_op) + + server = Server() + server.set_op_sequence(op_seq_maker.get_op_sequence()) + server.set_num_threads(thread_num) + server.set_memory_optimize(mem_optim) + server.set_ir_optimize(ir_optim) + + server.load_model_config(self._model_config) + if gpuid >= 0: + server.set_gpuid(gpuid) + server.prepare_server(workdir=workdir, port=port, device=device) + if self._fetch_vars is None: + self._fetch_vars = server.get_fetch_list() + return server + + def _start_one_server(self, service_idx): + self._rpc_service_list[service_idx].run_server() + + def prepare_server(self): + for i, device_id in enumerate(self._devices): + if self._workdir != "": + workdir = "{}_{}".format(self._workdir, i) + else: + workdir = _workdir_name_gen.next() + self._rpc_service_list.append( + self._prepare_one_server( + workdir, + self._port_list[i], + device_id, + thread_num=self._thread_num, + mem_optim=self._mem_optim, + ir_optim=self._ir_optim)) + + def start_server(self): + for i, service in enumerate(self._rpc_service_list): + p = multiprocessing.Process( + target=self._start_one_server, args=(i, )) + p.daemon = True + self._server_pros.append(p) + for p in self._server_pros: + p.start() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index b18b5ed8c43312481384913109be2830ad1eeb0f..3b928b9cbab28904e6225d88e229e9a0d2da4f56 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ChannelTimeoutError) from .util import NameGenerator from .profiler import UnsafeTimeProfiler as TimeProfiler +from . import local_rpc_service_handler _LOGGER = logging.getLogger(__name__) _op_name_gen = NameGenerator("Op") @@ -47,46 +48,128 @@ class Op(object): def __init__(self, name=None, input_ops=[], - server_endpoints=[], - fetch_list=[], + server_endpoints=None, + fetch_list=None, client_config=None, - concurrency=1, - timeout=-1, - retry=1, - batch_size=1, - auto_batching_timeout=None): + concurrency=None, + timeout=None, + retry=None, + batch_size=None, + auto_batching_timeout=None, + local_rpc_service_handler=None): + # In __init__, all the parameters are just saved and Op is not initialized if name is None: name = _op_name_gen.next() self.name = name # to identify the type of OP, it must be globally unique self.concurrency = concurrency # amount of concurrency self.set_input_ops(input_ops) + self._local_rpc_service_handler = local_rpc_service_handler self._server_endpoints = server_endpoints - self.with_serving = False - if len(self._server_endpoints) != 0: - self.with_serving = True - self._client_config = client_config self._fetch_names = fetch_list - - if timeout > 0: - self._timeout = timeout / 1000.0 - else: - self._timeout = -1 + self._client_config = client_config + self._timeout = timeout self._retry = max(1, retry) + self._batch_size = batch_size + self._auto_batching_timeout = auto_batching_timeout + self._input = None self._outputs = [] - self._batch_size = batch_size - self._auto_batching_timeout = auto_batching_timeout - if self._auto_batching_timeout is not None: - if self._auto_batching_timeout <= 0 or self._batch_size == 1: - _LOGGER.warning( - self._log( - "Because auto_batching_timeout <= 0 or batch_size == 1," - " set auto_batching_timeout to None.")) - self._auto_batching_timeout = None + self._server_use_profile = False + self._tracer = None + + # only for thread op + self._for_init_op_lock = threading.Lock() + self._for_close_op_lock = threading.Lock() + self._succ_init_op = False + self._succ_close_op = False + + def init_from_dict(self, conf): + # init op + if self.concurrency is None: + self.concurrency = conf["concurrency"] + if self._retry is None: + self._retry = conf["retry"] + if self._fetch_names is None: + self._fetch_names = conf.get("fetch_list") + if self._client_config is None: + self._client_config = conf.get("client_config") + + if self._timeout is None: + self._timeout = conf["timeout"] + if self._timeout > 0: + self._timeout = self._timeout / 1000.0 + else: + self._timeout = -1 + + if self._batch_size is None: + self._batch_size = conf["batch_size"] + if self._auto_batching_timeout is None: + self._auto_batching_timeout = conf["auto_batching_timeout"] + if self._auto_batching_timeout <= 0 or self._batch_size == 1: + _LOGGER.warning( + self._log( + "Because auto_batching_timeout <= 0 or batch_size == 1," + " set auto_batching_timeout to None.")) + self._auto_batching_timeout = None + else: + self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 + + if self._server_endpoints is None: + server_endpoints = conf.get("server_endpoints", []) + if len(server_endpoints) != 0: + # remote service + self.with_serving = True + self._server_endpoints = server_endpoints else: - self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 + if self._local_rpc_service_handler is None: + local_service_conf = conf.get("local_service_conf") + _LOGGER.info("local_service_conf: {}".format( + local_service_conf)) + model_config = local_service_conf.get("model_config") + _LOGGER.info("model_config: {}".format(model_config)) + if model_config is None: + self.with_serving = False + else: + # local rpc service + self.with_serving = True + service_handler = local_rpc_service_handler.LocalRpcServiceHandler( + model_config=model_config, + workdir=local_service_conf["workdir"], + thread_num=local_service_conf["thread_num"], + devices=local_service_conf["devices"], + mem_optim=local_service_conf["mem_optim"], + ir_optim=local_service_conf["ir_optim"]) + service_handler.prepare_server() # get fetch_list + serivce_ports = service_handler.get_port_list() + self._server_endpoints = [ + "127.0.0.1:{}".format(p) for p in serivce_ports + ] + if self._client_config is None: + self._client_config = service_handler.get_client_config( + ) + if self._fetch_names is None: + self._fetch_names = service_handler.get_fetch_list() + self._local_rpc_service_handler = service_handler + else: + self.with_serving = True + self._local_rpc_service_handler.prepare_server( + ) # get fetch_list + serivce_ports = self._local_rpc_service_handler.get_port_list( + ) + self._server_endpoints = [ + "127.0.0.1:{}".format(p) for p in serivce_ports + ] + if self._client_config is None: + self._client_config = self._local_rpc_service_handler.get_client_config( + ) + if self._fetch_names is None: + self._fetch_names = self._local_rpc_service_handler.get_fetch_list( + ) + else: + self.with_serving = True + if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp): _LOGGER.info( self._log("\n\tinput_ops: {}," @@ -98,20 +181,22 @@ class Op(object): "\n\tretry: {}," "\n\tbatch_size: {}," "\n\tauto_batching_timeout(s): {}".format( - ", ".join([op.name for op in input_ops + ", ".join([op.name for op in self._input_ops ]), self._server_endpoints, self._fetch_names, self._client_config, self.concurrency, self._timeout, self._retry, self._batch_size, self._auto_batching_timeout))) - self._server_use_profile = False - self._tracer = None - - # only for thread op - self._for_init_op_lock = threading.Lock() - self._for_close_op_lock = threading.Lock() - self._succ_init_op = False - self._succ_close_op = False + def launch_local_rpc_service(self): + if self._local_rpc_service_handler is None: + _LOGGER.warning( + self._log("Failed to launch local rpc" + " service: local_rpc_service_handler is None.")) + return + port = self._local_rpc_service_handler.get_port_list() + self._local_rpc_service_handler.start_server() + _LOGGER.info("Op({}) use local rpc service at port: {}" + .format(self.name, port)) def use_default_auto_batching_config(self): if self._batch_size != 1: @@ -775,7 +860,9 @@ class RequestOp(Op): for idx, key in enumerate(request.key): data = request.value[idx] try: - data = eval(data) + evaled_data = eval(data) + if isinstance(evaled_data, np.ndarray): + data = evaled_data except Exception as e: pass dictdata[key] = data diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index ad78c6d39002b206082de8eab238be1abf543fee..48368dd81459de98f21af4048a2b694a54e80b75 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -42,11 +42,12 @@ class PipelineClient(object): def _pack_request_package(self, feed_dict, profile): req = pipeline_service_pb2.Request() + np.set_printoptions(threshold=sys.maxsize) for key, value in feed_dict.items(): req.key.append(key) if isinstance(value, np.ndarray): req.value.append(value.__repr__()) - elif isinstance(value, str): + elif isinstance(value, (str, unicode)): req.value.append(value) elif isinstance(value, list): req.value.append(np.array(value).__repr__()) @@ -75,7 +76,9 @@ class PipelineClient(object): continue data = resp.value[idx] try: - data = eval(data) + evaled_data = eval(data) + if isinstance(evaled_data, np.ndarray): + data = evaled_data except Exception as e: pass fetch_map[key] = data diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index e8229e810308b10d35f903a8415d898177bc2239..a6d4f9ed66fd8f563cb1526c136cba11b06fd6b3 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -22,22 +22,31 @@ from contextlib import closing import multiprocessing import yaml -from .proto import pipeline_service_pb2_grpc -from .operator import ResponseOp -from .dag import DAGExecutor +from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2 +from . import operator +from . import dag +from . import util +from . import channel _LOGGER = logging.getLogger(__name__) class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): - def __init__(self, response_op, dag_conf, worker_idx=-1): + def __init__(self, name, response_op, dag_conf, worker_idx=-1): super(PipelineServicer, self).__init__() + self._name = name + # init dag executor - self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) + self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor.start() _LOGGER.info("[PipelineServicer] succ init") def inference(self, request, context): + if request.name != "" and request.name != self._name: + resp = pipeline_service_pb2.Response() + resp.ecode = channel.ChannelDataEcode.NO_SERVICE.value + resp.error_info = "Failed to inference: Service name error." + return resp resp = self._dag_executor.call(request) return resp @@ -57,35 +66,83 @@ def _reserve_port(port): class PipelineServer(object): - def __init__(self): - self._port = None + def __init__(self, name=None): + self._name = name # for grpc-gateway path + self._rpc_port = None self._worker_num = None self._response_op = None + self._proxy_server = None + + def _grpc_gateway(self, grpc_port, http_port): + import os + from ctypes import cdll + from . import gateway + lib_path = os.path.join( + os.path.dirname(gateway.__file__), "libproxy_server.so") + proxy_server = cdll.LoadLibrary(lib_path) + proxy_server.run_proxy_server(grpc_port, http_port) + + def _run_grpc_gateway(self, grpc_port, http_port): + if http_port <= 0: + _LOGGER.info("Ignore grpc_gateway configuration.") + return + if not util.AvailablePortGenerator.port_is_available(http_port): + raise SystemExit("Failed to run grpc-gateway: prot {} " + "is already used".format(http_port)) + if self._proxy_server is not None: + raise RuntimeError("Proxy server has been started.") + self._proxy_server = multiprocessing.Process( + target=self._grpc_gateway, args=( + grpc_port, + http_port, )) + self._proxy_server.daemon = True + self._proxy_server.start() def set_response_op(self, response_op): - if not isinstance(response_op, ResponseOp): + if not isinstance(response_op, operator.ResponseOp): raise Exception("Failed to set response_op: response_op " "must be ResponseOp type.") if len(response_op.get_input_ops()) != 1: raise Exception("Failed to set response_op: response_op " "can only have one previous op.") self._response_op = response_op + self._used_op, _ = dag.DAG.get_use_ops(self._response_op) + + def prepare_server(self, yml_file=None, yml_dict=None): + conf = ServerYamlConfChecker.load_server_yaml_conf( + yml_file=yml_file, yml_dict=yml_dict) + + self._rpc_port = conf.get("rpc_port") + self._http_port = conf.get("http_port") + if self._rpc_port is None: + if self._http_port is None: + raise SystemExit("Failed to prepare_server: rpc_port or " + "http_port can not be None.") + else: + # http mode: generate rpc_port + if not util.AvailablePortGenerator.port_is_available( + self._http_port): + raise SystemExit("Failed to prepare_server: http_port({}) " + "is already used".format(self._http_port)) + self._rpc_port = util.GetAvailablePortGenerator().next() + else: + if not util.AvailablePortGenerator.port_is_available( + self._rpc_port): + raise SystemExit("Failed to prepare_server: prot {} " + "is already used".format(self._rpc_port)) + if self._http_port is None: + # rpc mode + pass + else: + # http mode + if not util.AvailablePortGenerator.port_is_available( + self._http_port): + raise SystemExit("Failed to prepare_server: http_port({}) " + "is already used".format(self._http_port)) - def _port_is_available(self, port): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.settimeout(2) - result = sock.connect_ex(('0.0.0.0', port)) - return result != 0 - - def prepare_server(self, yml_file): - conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file) - - self._port = conf["port"] - if not self._port_is_available(self._port): - raise SystemExit("Failed to prepare_server: prot {} " - "is already used".format(self._port)) self._worker_num = conf["worker_num"] self._build_dag_each_worker = conf["build_dag_each_worker"] + self._init_ops(conf["op"]) _LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("\n{}".format( @@ -98,10 +155,40 @@ class PipelineServer(object): _LOGGER.info("-------------------------------------------") self._conf = conf + self._start_local_rpc_service() + + def _init_ops(self, op_conf): + default_conf = { + "concurrency": 1, + "timeout": -1, + "retry": 1, + "batch_size": 1, + "auto_batching_timeout": -1, + "local_service_conf": { + "workdir": "", + "thread_num": 2, + "devices": "", + "mem_optim": True, + "ir_optim": False, + }, + } + for op in self._used_op: + if not isinstance(op, operator.RequestOp) and not isinstance( + op, operator.ResponseOp): + conf = op_conf.get(op.name, default_conf) + op.init_from_dict(conf) + + def _start_local_rpc_service(self): + # only brpc now + if self._conf["dag"]["client_type"] != "brpc": + _LOGGER.warning("Local service version must be brpc type now.") + for op in self._used_op: + if not isinstance(op, operator.RequestOp): + op.launch_local_rpc_service() def run_server(self): if self._build_dag_each_worker: - with _reserve_port(self._port) as port: + with _reserve_port(self._rpc_port) as port: bind_address = 'localhost:{}'.format(port) workers = [] for i in range(self._worker_num): @@ -111,6 +198,9 @@ class PipelineServer(object): args=(bind_address, self._response_op, self._conf, i)) worker.start() workers.append(worker) + self._run_grpc_gateway( + grpc_port=self._rpc_port, + http_port=self._http_port) # start grpc_gateway for worker in workers: worker.join() else: @@ -120,9 +210,13 @@ class PipelineServer(object): ('grpc.max_receive_message_length', 256 * 1024 * 1024) ]) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(self._response_op, self._conf), server) - server.add_insecure_port('[::]:{}'.format(self._port)) + PipelineServicer(self._name, self._response_op, self._conf), + server) + server.add_insecure_port('[::]:{}'.format(self._rpc_port)) server.start() + self._run_grpc_gateway( + grpc_port=self._rpc_port, + http_port=self._http_port) # start grpc_gateway server.wait_for_termination() def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): @@ -133,7 +227,8 @@ class PipelineServer(object): futures.ThreadPoolExecutor( max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(response_op, dag_conf, worker_idx), server) + PipelineServicer(self._name, response_op, dag_conf, worker_idx), + server) server.add_insecure_port(bind_address) server.start() server.wait_for_termination() @@ -144,12 +239,25 @@ class ServerYamlConfChecker(object): pass @staticmethod - def load_server_yaml_conf(yml_file): - with open(yml_file) as f: - conf = yaml.load(f.read()) + def load_server_yaml_conf(yml_file=None, yml_dict=None): + if yml_file is not None and yml_dict is not None: + raise SystemExit("Failed to prepare_server: only one of yml_file" + " or yml_dict can be selected as the parameter.") + if yml_file is not None: + with open(yml_file) as f: + conf = yaml.load(f.read()) + elif yml_dict is not None: + conf = yml_dict + else: + raise SystemExit("Failed to prepare_server: yml_file or yml_dict" + " can not be None.") ServerYamlConfChecker.check_server_conf(conf) ServerYamlConfChecker.check_dag_conf(conf["dag"]) ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"]) + for op_name in conf["op"]: + ServerYamlConfChecker.check_op_conf(conf["op"][op_name]) + ServerYamlConfChecker.check_local_service_conf(conf["op"][op_name][ + "local_service_conf"]) return conf @staticmethod @@ -161,26 +269,80 @@ class ServerYamlConfChecker(object): @staticmethod def check_server_conf(conf): default_conf = { - "port": 9292, + # "rpc_port": 9292, "worker_num": 1, "build_dag_each_worker": False, + #"http_port": 0, "dag": {}, + "op": {}, } conf_type = { - "port": int, + "rpc_port": int, + "http_port": int, "worker_num": int, "build_dag_each_worker": bool, + "grpc_gateway_port": int, } conf_qualification = { - "port": [(">=", 1024), ("<=", 65535)], + "rpc_port": [(">=", 1024), ("<=", 65535)], + "http_port": [(">=", 1024), ("<=", 65535)], "worker_num": (">=", 1), } ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, conf_qualification) + @staticmethod + def check_local_service_conf(conf): + default_conf = { + "workdir": "", + "thread_num": 2, + "devices": "", + "mem_optim": True, + "ir_optim": False, + } + conf_type = { + "model_config": str, + "workdir": str, + "thread_num": int, + "devices": str, + "mem_optim": bool, + "ir_optim": bool, + } + conf_qualification = {"thread_num": (">=", 1), } + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + + @staticmethod + def check_op_conf(conf): + default_conf = { + "concurrency": 1, + "timeout": -1, + "retry": 1, + "batch_size": 1, + "auto_batching_timeout": -1, + "local_service_conf": {}, + } + conf_type = { + "server_endpoints": list, + "fetch_list": list, + "client_config": str, + "concurrency": int, + "timeout": int, + "retry": int, + "batch_size": int, + "auto_batching_timeout": int, + } + conf_qualification = { + "concurrency": (">=", 1), + "retry": (">=", 1), + "batch_size": (">=", 1), + } + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + @staticmethod def check_tracer_conf(conf): default_conf = {"interval_s": -1, } @@ -231,6 +393,8 @@ class ServerYamlConfChecker(object): @staticmethod def check_conf_type(conf, conf_type): for key, val in conf_type.items(): + if key not in conf: + continue if not isinstance(conf[key], val): raise SystemExit("[CONF] {} must be {} type, but get {}." .format(key, val, type(conf[key]))) @@ -238,6 +402,8 @@ class ServerYamlConfChecker(object): @staticmethod def check_conf_qualification(conf, conf_qualification): for key, qualification in conf_qualification.items(): + if key not in conf: + continue if not isinstance(qualification, list): qualification = [qualification] if not ServerYamlConfChecker.qualification_check(conf[key], diff --git a/python/pipeline/proto/pipeline_service.proto b/python/pipeline/proto/pipeline_service.proto index a920d5618ce36a191390d5140bee0a42c7394a6b..02c922027ea6c00a3831137b55604950378b84fe 100644 --- a/python/pipeline/proto/pipeline_service.proto +++ b/python/pipeline/proto/pipeline_service.proto @@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving; message Request { repeated string key = 1; repeated string value = 2; + optional string name = 3; }; message Response { diff --git a/python/pipeline/util.py b/python/pipeline/util.py index fb5e14ce808fd34de75b1a640630ca172510cd6c..d7847f179de7557b5446958536008adc3c981f95 100644 --- a/python/pipeline/util.py +++ b/python/pipeline/util.py @@ -17,6 +17,8 @@ import logging import threading import multiprocessing import multiprocessing.managers +from contextlib import closing +import socket if sys.version_info.major == 2: import Queue from Queue import PriorityQueue @@ -29,6 +31,34 @@ else: _LOGGER = logging.getLogger(__name__) +class AvailablePortGenerator(object): + def __init__(self, start_port=12000): + self._curr_port = start_port + + @staticmethod + def port_is_available(port): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.settimeout(2) + result = sock.connect_ex(('0.0.0.0', port)) + if result != 0: + return True + else: + return False + + def next(self): + while not AvailablePortGenerator.port_is_available(self._curr_port): + self._curr_port += 1 + self._curr_port += 1 + return self._curr_port - 1 + + +_AvailablePortGenerator = AvailablePortGenerator() + + +def GetAvailablePortGenerator(): + return _AvailablePortGenerator + + class NameGenerator(object): # use unsafe-id-generator def __init__(self, prefix): diff --git a/python/setup.py.app.in b/python/setup.py.app.in index 523b9e5c493e881d1ac3d6553bdb57a91f084acb..1a06b0d352c1da4cdd09f74cb900853d4016afa8 100644 --- a/python/setup.py.app.in +++ b/python/setup.py.app.in @@ -16,7 +16,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform import os from setuptools import setup, Distribution, Extension @@ -24,18 +23,9 @@ from setuptools import find_packages from setuptools import setup from paddle_serving_app.version import serving_app_version from pkg_resources import DistributionNotFound, get_distribution +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False - -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() if '${PACK}' == 'ON': copy_lib() diff --git a/python/setup.py.client.in b/python/setup.py.client.in index 96773c38dc950c0b8357274dff30d7c952ecdc25..bcedc41599399762b4b838b6d89eb7adaef23800 100644 --- a/python/setup.py.client.in +++ b/python/setup.py.client.in @@ -16,7 +16,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform import os import sys @@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_client.version import serving_client_version -from pkg_resources import DistributionNotFound, get_distribution +import util py_version = sys.version_info -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False - def copy_lib(): if py_version[0] == 2: lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10'] @@ -51,18 +40,20 @@ def copy_lib(): text = r.read() os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1])) -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() + +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_client") if '${PACK}' == 'ON': copy_lib() - REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1' ] -if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"): +if not util.find_package("paddlepaddle") and not util.find_package("paddlepaddle-gpu"): REQUIRED_PACKAGES.append("paddlepaddle") @@ -72,8 +63,10 @@ packages=['paddle_serving_client', 'paddle_serving_client.metric', 'paddle_serving_client.utils', 'paddle_serving_client.pipeline', - 'paddle_serving_client.pipeline.proto'] -package_data={'paddle_serving_client': ['serving_client.so','lib/*'],} + 'paddle_serving_client.pipeline.proto', + 'paddle_serving_client.pipeline.gateway', + 'paddle_serving_client.pipeline.gateway.proto'] +package_data={'paddle_serving_client': ['serving_client.so', 'lib/*', 'pipeline/gateway/libproxy_server.so'],} package_dir={'paddle_serving_client': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client', 'paddle_serving_client.proto': @@ -87,7 +80,11 @@ package_dir={'paddle_serving_client': 'paddle_serving_client.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline', 'paddle_serving_client.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto', + 'paddle_serving_client.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway', + 'paddle_serving_client.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway/proto'} setup( name='paddle-serving-client', diff --git a/python/setup.py.in b/python/setup.py.in index af7036bdd99e05966156064dd2bcf1bb8463b716..fa7051db94ebdd69778f7957f50b1301697398fe 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -16,17 +16,14 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving.version import serving_client_version +from grpc_tools import protoc +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle' diff --git a/python/setup.py.server.in b/python/setup.py.server.in index db679edbab8e6ba6929ed631c2bbc5a731146d0d..6733f1a4788818c530e3be0719686cea54cace49 100644 --- a/python/setup.py.server.in +++ b/python/setup.py.server.in @@ -16,25 +16,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_server.version import serving_server_version -from pkg_resources import DistributionNotFound, get_distribution - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] +max_version, mid_version, min_version = util.python_version() -max_version, mid_version, min_version = python_version() +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_server") REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', @@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ packages=['paddle_serving_server', 'paddle_serving_server.proto', 'paddle_serving_server.pipeline', - 'paddle_serving_server.pipeline.proto'] + 'paddle_serving_server.pipeline.proto', + 'paddle_serving_server.pipeline.gateway', + 'paddle_serving_server.pipeline.gateway.proto'] package_dir={'paddle_serving_server': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server', @@ -53,7 +46,13 @@ package_dir={'paddle_serving_server': 'paddle_serving_server.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline', 'paddle_serving_server.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto', + 'paddle_serving_server.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway', + 'paddle_serving_server.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway/proto'} + +package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so'],} setup( name='paddle-serving-server', @@ -65,6 +64,7 @@ setup( author_email='guru4elephant@gmail.com', install_requires=REQUIRED_PACKAGES, packages=packages, + package_data=package_data, package_dir=package_dir, # PyPI package information. classifiers=[ diff --git a/python/setup.py.server_gpu.in b/python/setup.py.server_gpu.in index 4554c1d368f70a32d16ceeabb54d63625f9f256d..523615b8e782c29ebdedadc54a9473a0b672aac0 100644 --- a/python/setup.py.server_gpu.in +++ b/python/setup.py.server_gpu.in @@ -16,25 +16,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_server_gpu.version import serving_server_version -from pkg_resources import DistributionNotFound, get_distribution - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] +max_version, mid_version, min_version = util.python_version() -max_version, mid_version, min_version = python_version() +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_server_gpu") REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', @@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ packages=['paddle_serving_server_gpu', 'paddle_serving_server_gpu.proto', 'paddle_serving_server_gpu.pipeline', - 'paddle_serving_server_gpu.pipeline.proto'] + 'paddle_serving_server_gpu.pipeline.proto', + 'paddle_serving_server_gpu.pipeline.gateway', + 'paddle_serving_server_gpu.pipeline.gateway.proto'] package_dir={'paddle_serving_server_gpu': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu', @@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu': 'paddle_serving_server_gpu.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline', 'paddle_serving_server_gpu.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto', + 'paddle_serving_server_gpu.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway', + 'paddle_serving_server_gpu.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway/proto'} + +package_data={'paddle_serving_server_gpu': ['pipeline/gateway/libproxy_server.so'],} setup( name='paddle-serving-server-gpu', @@ -65,6 +64,7 @@ setup( author_email='guru4elephant@gmail.com', install_requires=REQUIRED_PACKAGES, packages=packages, + package_data=package_data, package_dir=package_dir, # PyPI package information. classifiers=[ diff --git a/python/util.py b/python/util.py new file mode 100644 index 0000000000000000000000000000000000000000..0ae68c1ed53766cb7f4f623e3a5f4fb50f7eb095 --- /dev/null +++ b/python/util.py @@ -0,0 +1,70 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pkg_resources import DistributionNotFound, get_distribution +from grpc_tools import protoc +import os +import platform + + +def python_version(): + return [int(v) for v in platform.python_version().split(".")] + + +def find_package(pkgname): + try: + get_distribution(pkgname) + return True + except DistributionNotFound: + return False + + +def gen_pipeline_code(package_name): + # pipeline service proto + protoc.main(( + '', + '-I.', + '--python_out=.', + '--grpc_python_out=.', + '{}/pipeline/proto/pipeline_service.proto'.format(package_name), )) + + # pipeline grpc-gateway proto + # *.pb.go + ret = os.system( + "cd {}/pipeline/gateway/proto/ && " + "../../../../../third_party/install/protobuf/bin/protoc -I. " + "-I$GOPATH/src " + "-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis " + "--go_out=plugins=grpc:. " + "gateway.proto".format(package_name)) + if ret != 0: + exit(1) + # *.gw.go + ret = os.system( + "cd {}/pipeline/gateway/proto/ && " + "../../../../../third_party/install/protobuf/bin/protoc -I. " + "-I$GOPATH/src " + "-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis " + "--grpc-gateway_out=logtostderr=true:. " + "gateway.proto".format(package_name)) + if ret != 0: + exit(1) + + # pipeline grpc-gateway shared-lib + ret = os.system( + "cd {}/pipeline/gateway && " + "go build -buildmode=c-shared -o libproxy_server.so proxy_server.go". + format(package_name)) + if ret != 0: + exit(1) diff --git a/tools/serving_build.sh b/tools/serving_build.sh index 116a5b1e2a8bd5da6cd27b64e1e3136115bf32c5..2769a3d402706c3ad2c94c539a7affe11cfcba7f 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -19,6 +19,13 @@ function init() { cd Serving export SERVING_WORKDIR=$PWD $PYTHONROOT/bin/python -m pip install -r python/requirements.txt + export GOPATH=$HOME/go + export PATH=$PATH:$GOPATH/bin + + go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway + go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger + go get -u github.com/golang/protobuf/protoc-gen-go + go get -u google.golang.org/grpc } function check_cmd() { @@ -298,7 +305,6 @@ function python_test_bert() { cd bert # pwd: /Serving/python/examples/bert case $TYPE in CPU) - pip install paddlehub # Because download from paddlehub may timeout, # download the model from bos(max_seq_len=128). wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz @@ -306,14 +312,12 @@ function python_test_bert() { sh get_data.sh check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &" sleep 5 - pip install paddle_serving_app check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" kill_server_process echo "bert RPC inference pass" ;; GPU) export CUDA_VISIBLE_DEVICES=0 - pip install paddlehub # Because download from paddlehub may timeout, # download the model from bos(max_seq_len=128). wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz @@ -321,7 +325,6 @@ function python_test_bert() { sh get_data.sh check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &" sleep 5 - pip install paddle_serving_app check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" kill_server_process echo "bert RPC inference pass" @@ -794,13 +797,14 @@ function python_test_resnet50(){ } function python_test_pipeline(){ - # pwd:/ Serving/python/examples + # pwd: /Serving/python/examples local TYPE=$1 export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving unsetproxy - cd pipeline/imdb_model_ensemble + cd pipeline # pwd: /Serving/python/examples/pipeline case $TYPE in CPU) + cd imdb_model_ensemble # pwd: /Serving/python/examples/pipeline/imdb_model_ensemble # start paddle serving service (brpc) sh get_data.sh python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log & @@ -809,7 +813,7 @@ function python_test_pipeline(){ # test: thread servicer & thread op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -826,7 +830,7 @@ EOF # test: thread servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -843,7 +847,7 @@ EOF # test: process servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -862,7 +866,7 @@ EOF pip uninstall grpcio -y pip install grpcio --no-binary=grpcio cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: true dag: @@ -886,7 +890,7 @@ EOF python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log & sleep 5 cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -903,16 +907,47 @@ EOF kill_server_process kill_process_by_port 9292 kill_process_by_port 9393 + cd .. + + cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service + sh get_data.sh + python web_service.py >/dev/null & + sleep 5 + curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' + check http code + http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction` + if [ ${http_code} -ne 200 ]; then + echo "HTTP status code -ne 200" + exit 1 + fi + ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill + kill_server_process + cd .. ;; GPU) - echo "pipeline ignore GPU test" + cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service + sh get_data.sh + python web_service.py >/dev/null & + sleep 5 + curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' + # check http code + http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction` + if [ ${http_code} -ne 200 ]; then + echo "HTTP status code -ne 200" + exit 1 + fi + ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill + kill_server_process + cd .. # pwd: /Serving/python/examples/pipeline ;; *) echo "error type" exit 1 ;; esac - cd ../../ + cd .. setproxy unset SERVING_BIN } @@ -963,118 +998,8 @@ function monitor_test() { mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test case $TYPE in CPU): - pip install pyftpdlib - mkdir remote_path - mkdir local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &" - cd .. # pwd: /Serving/_monitor_test - - # type: ftp - # remote_path: / - # remote_model_name: uci_housing.tar.gz - # local_tmp_path: ___tmp - # local_path: local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - touch donefile - cd .. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ - --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ - --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: ftp - # remote_path: /tmp_dir - # remote_model_name: uci_housing_model - # local_tmp_path: ___tmp - # local_path: local_path - mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - tar -xzf uci_housing.tar.gz - touch donefile - cd ../.. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ - --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: general - # remote_path: / - # remote_model_name: uci_housing.tar.gz - # local_tmp_path: ___tmp - # local_path: local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - touch donefile - cd .. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='general' --general_host='ftp://127.0.0.1:8000' \ - --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ - --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: general - # remote_path: /tmp_dir - # remote_model_name: uci_housing_model - # local_tmp_path: ___tmp - # local_path: local_path - mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - tar -xzf uci_housing.tar.gz - touch donefile - cd ../.. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='general' --general_host='ftp://127.0.0.1:8000' \ - --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill + # The CPU part and GPU part are identical. + # In order to avoid Travis CI timeout (50 min), the CPU version is not checked ;; GPU): pip install pyftpdlib