From 0aa8c8129ff683b6058f92b627ac100609819c8e Mon Sep 17 00:00:00 2001 From: barriery Date: Tue, 25 Aug 2020 08:43:23 +0000 Subject: [PATCH] update webservice --- doc/PIPELINE_SERVING.md | 4 +- doc/PIPELINE_SERVING_CN.md | 4 +- .../pipeline/imdb_model_ensemble/config.yml | 4 +- python/examples/pipeline/ocr/README.md | 18 +- python/examples/pipeline/ocr/README_CN.md | 19 +- python/examples/pipeline/ocr/config.yml | 17 +- .../ocr/hybrid_service_pipeline_server.py | 1 - .../ocr/local_service_pipeline_server.py | 1 - python/examples/pipeline/ocr/web_service.py | 112 +++++++++++ .../README.md | 7 +- .../README_CN.md | 7 +- .../pipeline/simple_web_service/config.yml | 9 + .../get_data.sh | 0 .../web_service.py | 31 +-- .../examples/pipeline/web_service/config.yml | 9 - .../web_service/local_pipeline_server.py | 59 ------ python/paddle_serving_server/web_service.py | 166 +++------------- .../paddle_serving_server_gpu/web_service.py | 178 +++--------------- python/pipeline/local_rpc_service_handler.py | 6 + tools/serving_build.sh | 14 +- 20 files changed, 263 insertions(+), 403 deletions(-) create mode 100644 python/examples/pipeline/ocr/web_service.py rename python/examples/pipeline/{web_service => simple_web_service}/README.md (71%) rename python/examples/pipeline/{web_service => simple_web_service}/README_CN.md (62%) create mode 100644 python/examples/pipeline/simple_web_service/config.yml rename python/examples/pipeline/{web_service => simple_web_service}/get_data.sh (100%) rename python/examples/pipeline/{web_service => simple_web_service}/web_service.py (65%) delete mode 100644 python/examples/pipeline/web_service/config.yml delete mode 100644 python/examples/pipeline/web_service/local_pipeline_server.py diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index 5b87366f..4205aa15 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -251,10 +251,10 @@ server.run_server() Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: ```yaml -port: 18080 # gRPC port +rpc_port: 18080 # gRPC port worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 build_dag_each_worker: false # Whether to use process server or not. The default is false -grpc_gateway_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0. +http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0. dag: is_thread_op: true # Whether to use the thread version of OP. The default is true client_type: brpc # Use brpc or grpc client. The default is brpc diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index c54f9390..7cab409b 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -249,10 +249,10 @@ server.run_server() 其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: ```yaml -port: 18080 # gRPC端口号 +rpc_port: 18080 # gRPC端口号 worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false -grpc_gateway_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 +http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 dag: is_thread_op: true # 是否使用线程版Op,默认为 true client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index fc637139..3447ffd4 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -1,7 +1,7 @@ -port: 18080 +rpc_port: 18085 worker_num: 4 build_dag_each_worker: false -grpc_gateway_port: 9999 +http_port: 9999 dag: is_thread_op: false client_type: brpc diff --git a/python/examples/pipeline/ocr/README.md b/python/examples/pipeline/ocr/README.md index a4791545..f51789fc 100644 --- a/python/examples/pipeline/ocr/README.md +++ b/python/examples/pipeline/ocr/README.md @@ -1,8 +1,8 @@ -# Pipeline OCR Service +# OCR Pipeline WebService (English|[简体中文](./README_CN.md)) -This document will take OCR as an example to show how to use PipelineServing to start multi-model tandem services. +This document will take OCR as an example to show how to use Pipeline WebService to start multi-model tandem services. ## Get Model ``` @@ -19,6 +19,19 @@ tar xf test_imgs.tar ``` ## Start Service +``` +python web_service.py &>log.txt & +``` + +## Test +``` +python pipeline_http_client.py +``` + + + + diff --git a/python/examples/pipeline/ocr/README_CN.md b/python/examples/pipeline/ocr/README_CN.md index ff4570d5..ba1150d3 100644 --- a/python/examples/pipeline/ocr/README_CN.md +++ b/python/examples/pipeline/ocr/README_CN.md @@ -1,8 +1,8 @@ -# Pipeline OCR 服务 +# OCR Pipeline WebService ([English](./README.md)|简体中文) -本文档将以 OCR 为例,介绍如何使用 PipelineServing 启动多模型串联的服务。 +本文档将以 OCR 为例,介绍如何使用 Pipeline WebService 启动多模型串联的服务。 ## 获取模型 ``` @@ -11,13 +11,25 @@ tar -xzvf ocr_rec.tar.gz python -m paddle_serving_app.package --get_model ocr_det tar -xzvf ocr_det.tar.gz ``` + ## 获取数据集(可选) ``` wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar tar xf test_imgs.tar ``` -## 启动服务 +## 启动 WebService +``` +python web_service.py &>log.txt & +``` + +## 测试 +``` +python pipeline_http_client.py +``` + + diff --git a/python/examples/pipeline/ocr/config.yml b/python/examples/pipeline/ocr/config.yml index ed609bad..48addccf 100644 --- a/python/examples/pipeline/ocr/config.yml +++ b/python/examples/pipeline/ocr/config.yml @@ -1,9 +1,22 @@ -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false -grpc_gateway_port: 9999 +http_port: 9999 dag: is_thread_op: false client_type: brpc retry: 1 use_profile: false +op: + det: + concurrency: 2 + local_service_conf: + model_config: ocr_det_model + devices: "0" + rec: + concurrency: 1 + timeout: -1 + retry: 1 + local_service_conf: + model_config: ocr_rec_model + devices: "0" diff --git a/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py index 241230c4..0a875c71 100644 --- a/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py +++ b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py @@ -132,5 +132,4 @@ response_op = ResponseOp(input_ops=[rec_op]) server = PipelineServer() server.set_response_op(response_op) server.prepare_server('config.yml') -server.start_local_rpc_service() # add this line server.run_server() diff --git a/python/examples/pipeline/ocr/local_service_pipeline_server.py b/python/examples/pipeline/ocr/local_service_pipeline_server.py index c3dd115d..e3ebc07f 100644 --- a/python/examples/pipeline/ocr/local_service_pipeline_server.py +++ b/python/examples/pipeline/ocr/local_service_pipeline_server.py @@ -131,5 +131,4 @@ response_op = ResponseOp(input_ops=[rec_op]) server = PipelineServer() server.set_response_op(response_op) server.prepare_server('config.yml') -server.start_local_rpc_service() # add this line server.run_server() diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py new file mode 100644 index 00000000..d1e6ec80 --- /dev/null +++ b/python/examples/pipeline/ocr/web_service.py @@ -0,0 +1,112 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np +import cv2 +import base64 +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +class OcrService(WebService): + def get_pipeline_response(self, read_op): + det_op = DetOp(name="det", input_ops=[read_op]) + rec_op = RecOp(name="rec", input_ops=[det_op]) + return rec_op + + +uci_service = OcrService(name="ocr") +uci_service.prepare_pipeline_config("config.yml") +uci_service.run_service() diff --git a/python/examples/pipeline/web_service/README.md b/python/examples/pipeline/simple_web_service/README.md similarity index 71% rename from python/examples/pipeline/web_service/README.md rename to python/examples/pipeline/simple_web_service/README.md index b493c7ce..407d6967 100644 --- a/python/examples/pipeline/web_service/README.md +++ b/python/examples/pipeline/simple_web_service/README.md @@ -1,6 +1,6 @@ -# Default Pipeline Web Service +# Simple Pipeline WebService -This document will takes UCI service as an example to introduce how to use DefaultPipelineWebService. +This document will takes UCI service as an example to introduce how to use Pipeline WebService. ## Get model ``` @@ -17,6 +17,3 @@ python web_service.py &>log.txt & ``` curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' ``` - -## More -`web_service.py` and `local_pipeline_server.py` are essentially the same. diff --git a/python/examples/pipeline/web_service/README_CN.md b/python/examples/pipeline/simple_web_service/README_CN.md similarity index 62% rename from python/examples/pipeline/web_service/README_CN.md rename to python/examples/pipeline/simple_web_service/README_CN.md index 98eb3406..3b53c694 100644 --- a/python/examples/pipeline/web_service/README_CN.md +++ b/python/examples/pipeline/simple_web_service/README_CN.md @@ -1,6 +1,6 @@ -# Default Pipeline Web Service +# Simple Pipeline WebService -这里以 Uci 服务为例来介绍 DefaultPipelineWebService 的使用。 +这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。 ## 获取模型 ``` @@ -17,6 +17,3 @@ python web_service.py &>log.txt & ``` curl -X POST -k http://localhost:18080/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' ``` - -## 更多 -`web_service.py` 和 `local_pipeline_server.py` 本质是相同的。 diff --git a/python/examples/pipeline/simple_web_service/config.yml b/python/examples/pipeline/simple_web_service/config.yml new file mode 100644 index 00000000..72e473e3 --- /dev/null +++ b/python/examples/pipeline/simple_web_service/config.yml @@ -0,0 +1,9 @@ +worker_num: 4 +http_port: 18080 +dag: + is_thread_op: false +op: + uci: + local_service_conf: + model_config: uci_housing_model + devices: "" # "0,1" diff --git a/python/examples/pipeline/web_service/get_data.sh b/python/examples/pipeline/simple_web_service/get_data.sh similarity index 100% rename from python/examples/pipeline/web_service/get_data.sh rename to python/examples/pipeline/simple_web_service/get_data.sh diff --git a/python/examples/pipeline/web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py similarity index 65% rename from python/examples/pipeline/web_service/web_service.py rename to python/examples/pipeline/simple_web_service/web_service.py index 59d73ce6..742f7235 100644 --- a/python/examples/pipeline/web_service/web_service.py +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -12,39 +12,40 @@ # See the License for the specific language governing permissions and # limitations under the License. try: - from paddle_serving_server_gpu.web_service import DefaultPipelineWebService + from paddle_serving_server_gpu.web_service import WebService, Op except ImportError: - from paddle_serving_server.web_service import DefaultPipelineWebService + from paddle_serving_server.web_service import WebService, Op import logging import numpy as np _LOGGER = logging.getLogger() -class UciService(DefaultPipelineWebService): - def init_separator(self): +class UciOp(Op): + def init_op(self): self.separator = "," - def preprocess(self, input_dict): - # _LOGGER.info(input_dict) + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + _LOGGER.info(input_dict) x_value = input_dict["x"] if isinstance(x_value, (str, unicode)): input_dict["x"] = np.array( [float(x.strip()) for x in x_value.split(self.separator)]) return input_dict - def postprocess(self, input_dict, fetch_dict): - # _LOGGER.info(fetch_dict) + def postprocess(self, input_dicts, fetch_dict): + _LOGGER.info(fetch_dict) fetch_dict["price"] = str(fetch_dict["price"][0][0]) return fetch_dict +class UciService(WebService): + def get_pipeline_response(self, read_op): + uci_op = UciOp(name="uci", input_ops=[read_op]) + return uci_op + + uci_service = UciService(name="uci") -uci_service.init_separator() -uci_service.load_model_config("./uci_housing_model") -try: - uci_service.set_gpus("0") -except Exception: - pass -uci_service.prepare_server(workdir="workdir", port=18080) +uci_service.prepare_pipeline_config("config.yml") uci_service.run_service() diff --git a/python/examples/pipeline/web_service/config.yml b/python/examples/pipeline/web_service/config.yml deleted file mode 100644 index 3868e317..00000000 --- a/python/examples/pipeline/web_service/config.yml +++ /dev/null @@ -1,9 +0,0 @@ -port: 18081 -worker_num: 4 -build_dag_each_worker: false -grpc_gateway_port: 18080 -dag: - is_thread_op: false - client_type: brpc - retry: 1 - use_profile: false diff --git a/python/examples/pipeline/web_service/local_pipeline_server.py b/python/examples/pipeline/web_service/local_pipeline_server.py deleted file mode 100644 index 00655c2c..00000000 --- a/python/examples/pipeline/web_service/local_pipeline_server.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# pylint: disable=doc-string-missing -from paddle_serving_server_gpu import pipeline -import numpy as np -import logging - -_LOGGER = logging.getLogger() - - -class UciOp(pipeline.Op): - def init_op(self): - self.separator = "," - - def preprocess(self, input_dicts): - (_, input_dict), = input_dicts.items() - _LOGGER.info(input_dict) - x_value = input_dict["x"] - if isinstance(x_value, (str, unicode)): - input_dict["x"] = np.array( - [float(x.strip()) for x in x_value.split(self.separator)]) - return input_dict - - def postprocess(self, input_dicts, fetch_dict): - _LOGGER.info(fetch_dict) - fetch_dict["price"] = str(fetch_dict["price"][0][0]) - return fetch_dict - - -read_op = pipeline.RequestOp() -uci_op = UciOp( - name="uci", - input_ops=[read_op], - local_rpc_service_handler=pipeline.LocalRpcServiceHandler( - model_config="uci_housing_model", - workdir="workdir", - thread_num=2, - devices="0", # if devices="", use cpu - mem_optim=True, - ir_optim=False), - concurrency=1) -response_op = pipeline.ResponseOp(input_ops=[uci_op]) - -server = pipeline.PipelineServer() -server.set_response_op(response_op) -server.prepare_server('config.yml') -server.start_local_rpc_service() # after prepare_server -server.run_server() diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py index 557ee47d..da8a88bf 100644 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -22,156 +22,35 @@ from contextlib import closing import socket from paddle_serving_server import pipeline -from paddle_serving_server.pipeline.util import AvailablePortGenerator +from paddle_serving_server.pipeline import Op -class DefaultPipelineServer(object): - def __init__(self, available_port_generator): - self.server = pipeline.PipelineServer() - self.available_port_generator = available_port_generator - - def create_internel_op_class(self, f_preprocess, f_postprocess): - class InternelOp(pipeline.Op): - # f_preprocess and f_postprocess use variables - # in closures, so init_op function is not necessary. - def preprocess(self, input_dicts): - (_, input_dict), = input_dicts.items() - preped_data = f_preprocess(input_dict) - return preped_data - - def postprocess(self, input_dicts, fetch_dict): - (_, input_dict), = input_dicts.items() - postped_data = f_postprocess(input_dict, fetch_dict) - return postped_data - - self.internel_op_class = InternelOp - - def create_local_rpc_service_handler(self, model_config, workdir, - thread_num, devices, mem_optim, - ir_optim): - self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler( - model_config=model_config, - workdir=workdir, - thread_num=thread_num, - devices=devices, - mem_optim=mem_optim, - ir_optim=ir_optim, - available_port_generator=self.available_port_generator) - - def init_pipeline_server(self, - internel_op_name, - internel_op_fetch_list=[], - internel_op_concurrency=4, - internel_op_timeout=-1, - internel_op_retry=1, - internel_op_batch_size=1, - internel_op_auto_batching_timeout=None): - read_op = pipeline.RequestOp() - internel_op = self.internel_op_class( - name=internel_op_name, - input_ops=[read_op], - fetch_list=internel_op_fetch_list, - local_rpc_service_handler=self.local_rpc_service_handler, - concurrency=internel_op_concurrency, - timeout=internel_op_timeout, - retry=internel_op_retry, - batch_size=internel_op_batch_size, - auto_batching_timeout=internel_op_auto_batching_timeout) - response_op = pipeline.ResponseOp(input_ops=[internel_op]) - self.server.set_response_op(response_op) - - def prepare_pipeline_server(self, - rpc_port, - http_port, - worker_num, - build_dag_each_worker=False, - is_thread_op=False, - client_type="brpc", - retry=1, - use_profile=False, - tracer_interval_s=-1): - default_server_conf = { - "port": rpc_port, - "worker_num": worker_num, - "build_dag_each_worker": build_dag_each_worker, - "grpc_gateway_port": http_port, - "dag": { - "is_thread_op": is_thread_op, - "client_type": client_type, - "retry": retry, - "use_profile": use_profile, - "tracer": { - "interval_s": tracer_interval_s, - } - } - } - self.server.prepare_server(yml_dict=default_server_conf) - - def start_pipeline_server(self): - self.server.start_local_rpc_service() - self.server.run_server() - - -class DefaultPipelineWebService(object): - def __init__(self, name="default"): +class WebService(object): + def __init__(self, name="default_service"): self.name = name - self.port = None - self.model_config = None - self.gpus = "" # use cpu - self.available_port_generator = AvailablePortGenerator(12000) - self.default_pipeline_server = DefaultPipelineServer( - self.available_port_generator) - - def load_model_config(self, model_config): - self.model_config = model_config + # pipeline + self._server = pipeline.PipelineServer() - def prepare_server(self, - workdir="workdir", - port=9393, - thread_num=2, - grpc_worker_num=4, - mem_optim=True, - ir_optim=False): - if not self.available_port_generator.port_is_available(port): - raise SystemExit("Failed to prepare server: prot({}) is not" - " available".format(port)) - self.port = port + def get_pipeline_response(self, read_op): + return None - self.default_pipeline_server.create_internel_op_class(self.preprocess, - self.postprocess) - self.default_pipeline_server.create_local_rpc_service_handler( - model_config=self.model_config, - workdir=workdir, - thread_num=thread_num, - devices=self.gpus, - mem_optim=mem_optim, - ir_optim=ir_optim) - self.default_pipeline_server.init_pipeline_server( - internel_op_name=self.name) - self.default_pipeline_server.prepare_pipeline_server( - rpc_port=self.available_port_generator.next(), - http_port=self.port, - worker_num=grpc_worker_num) + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) def run_service(self): - import socket - localIP = socket.gethostbyname(socket.gethostname()) - print("web service address: http://{}:{}/prediction" - .format(localIP, self.port)) - self.default_pipeline_server.start_pipeline_server() - - def preprocess(self, feed_dict): - return feed_dict - - def postprocess(self, feed_dict, fetch_dict): - return fetch_dict - - -class WebService(object): - def __init__(self, name="default_service"): - self.name = name + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def _launch_rpc_service(self): @@ -208,6 +87,7 @@ class WebService(object): device="cpu", mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -247,6 +127,7 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -296,6 +177,7 @@ class WebService(object): "{}".format(self.model_config), gpu=False, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, @@ -305,9 +187,11 @@ class WebService(object): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): + print("This API will be deprecated later. Please do not use it") for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index 9231cfbe..f5fe483c 100644 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -25,164 +25,42 @@ import numpy as np import paddle_serving_server_gpu as serving from paddle_serving_server_gpu import pipeline -from paddle_serving_server_gpu.pipeline.util import AvailablePortGenerator - - -class DefaultPipelineServer(object): - def __init__(self, available_port_generator): - self.server = pipeline.PipelineServer() - self.available_port_generator = available_port_generator - - def create_internel_op_class(self, f_preprocess, f_postprocess): - class InternelOp(pipeline.Op): - # f_preprocess and f_postprocess use variables - # in closures, so init_op function is not necessary. - def preprocess(self, input_dicts): - (_, input_dict), = input_dicts.items() - preped_data = f_preprocess(input_dict) - return preped_data - - def postprocess(self, input_dicts, fetch_dict): - (_, input_dict), = input_dicts.items() - postped_data = f_postprocess(input_dict, fetch_dict) - return postped_data - - self.internel_op_class = InternelOp - - def create_local_rpc_service_handler(self, model_config, workdir, - thread_num, devices, mem_optim, - ir_optim): - self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler( - model_config=model_config, - workdir=workdir, - thread_num=thread_num, - devices=devices, - mem_optim=mem_optim, - ir_optim=ir_optim, - available_port_generator=self.available_port_generator) - - def init_pipeline_server(self, - internel_op_name, - internel_op_fetch_list=[], - internel_op_concurrency=4, - internel_op_timeout=-1, - internel_op_retry=1, - internel_op_batch_size=1, - internel_op_auto_batching_timeout=None): - read_op = pipeline.RequestOp() - internel_op = self.internel_op_class( - name=internel_op_name, - input_ops=[read_op], - fetch_list=internel_op_fetch_list, - local_rpc_service_handler=self.local_rpc_service_handler, - concurrency=internel_op_concurrency, - timeout=internel_op_timeout, - retry=internel_op_retry, - batch_size=internel_op_batch_size, - auto_batching_timeout=internel_op_auto_batching_timeout) - response_op = pipeline.ResponseOp(input_ops=[internel_op]) - self.server.set_response_op(response_op) - - def prepare_pipeline_server(self, - rpc_port, - http_port, - worker_num, - build_dag_each_worker=False, - is_thread_op=False, - client_type="brpc", - retry=1, - use_profile=False, - tracer_interval_s=-1): - default_server_conf = { - "port": rpc_port, - "worker_num": worker_num, - "build_dag_each_worker": build_dag_each_worker, - "grpc_gateway_port": http_port, - "dag": { - "is_thread_op": is_thread_op, - "client_type": client_type, - "retry": retry, - "use_profile": use_profile, - "tracer": { - "interval_s": tracer_interval_s, - } - } - } - self.server.prepare_server(yml_dict=default_server_conf) - - def start_pipeline_server(self): - self.server.start_local_rpc_service() - self.server.run_server() - - -class DefaultPipelineWebService(object): - def __init__(self, name="default"): - self.name = name - self.port = None - self.model_config = None - self.gpus = "" - self.available_port_generator = AvailablePortGenerator(12000) - self.default_pipeline_server = DefaultPipelineServer( - self.available_port_generator) - - def load_model_config(self, model_config): - self.model_config = model_config - - def set_gpus(self, gpus): - self.gpus = gpus - - def prepare_server(self, - workdir="workdir", - port=9393, - thread_num=2, - grpc_worker_num=4, - mem_optim=True, - ir_optim=False): - if not self.available_port_generator.port_is_available(port): - raise SystemExit("Failed to prepare server: prot({}) is not" - " available".format(port)) - self.port = port +from paddle_serving_server_gpu.pipeline import Op - self.default_pipeline_server.create_internel_op_class(self.preprocess, - self.postprocess) - self.default_pipeline_server.create_local_rpc_service_handler( - model_config=self.model_config, - workdir=workdir, - thread_num=thread_num, - devices=self.gpus, - mem_optim=mem_optim, - ir_optim=ir_optim) - self.default_pipeline_server.init_pipeline_server( - internel_op_name=self.name) - self.default_pipeline_server.prepare_pipeline_server( - rpc_port=self.available_port_generator.next(), - http_port=self.port, - worker_num=grpc_worker_num) - def run_service(self): - import socket - localIP = socket.gethostbyname(socket.gethostname()) - print("web service address: http://{}:{}/prediction" - .format(localIP, self.port)) - self.default_pipeline_server.start_pipeline_server() +class WebService(object): + def __init__(self, name="default_service"): + self.name = name + # pipeline + self._server = pipeline.PipelineServer() - def preprocess(self, feed_dict): - return feed_dict + self.gpus = [] # deprecated + self.rpc_service_list = [] # deprecated - def postprocess(self, feed_dict, fetch_dict): - return fetch_dict + def get_pipeline_response(self, read_op): + return None + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) -class WebService(object): - def __init__(self, name="default_service"): - self.name = name - self.gpus = [] - self.rpc_service_list = [] + def run_service(self): + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def set_gpus(self, gpus): + print("This API will be deprecated later. Please do not use it") self.gpus = [int(x) for x in gpus.split(",")] def default_rpc_service(self, @@ -236,6 +114,7 @@ class WebService(object): gpuid=0, mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -303,6 +182,7 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -331,6 +211,7 @@ class WebService(object): # TODO: maybe change another API name: maybe run_local_predictor? def run_debugger_service(self, gpu=False): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -357,6 +238,7 @@ class WebService(object): "{}".format(self.model_config), gpu=gpu, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, @@ -366,9 +248,11 @@ class WebService(object): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): + print("This API will be deprecated later. Please do not use it") for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/pipeline/local_rpc_service_handler.py b/python/pipeline/local_rpc_service_handler.py index 2a842ce7..376fcaf1 100644 --- a/python/pipeline/local_rpc_service_handler.py +++ b/python/pipeline/local_rpc_service_handler.py @@ -17,8 +17,10 @@ import logging import multiprocessing try: from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "GPU" except ImportError: from paddle_serving_server import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "CPU" from . import util _LOGGER = logging.getLogger(__name__) @@ -47,6 +49,10 @@ class LocalRpcServiceHandler(object): .format(model_config, self._port_list)) else: # gpu + if PACKAGE_VERSION == "CPU": + raise ValueError( + "You are using the CPU version package(" + "paddle-serving-server), unable to set devices") devices = [int(x) for x in devices.split(",")] for _ in devices: self._port_list.append(available_port_generator.next()) diff --git a/tools/serving_build.sh b/tools/serving_build.sh index 4a9a08a1..15181e7e 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -779,7 +779,7 @@ function python_test_pipeline(){ # test: thread servicer & thread op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -796,7 +796,7 @@ EOF # test: thread servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -813,7 +813,7 @@ EOF # test: process servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -832,7 +832,7 @@ EOF pip uninstall grpcio -y pip install grpcio --no-binary=grpcio cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: true dag: @@ -856,7 +856,7 @@ EOF python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log & sleep 5 cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -875,7 +875,7 @@ EOF kill_process_by_port 9393 cd .. - cd web_service # pwd: /Serving/python/examples/pipeline/web_service + cd sample_web_service # pwd: /Serving/python/examples/pipeline/sample_web_service sh get_data.sh python web_service.py >/dev/null & sleep 5 @@ -892,7 +892,7 @@ EOF cd .. ;; GPU) - cd web_service # pwd: /Serving/python/examples/pipeline/web_service + cd sample_web_service # pwd: /Serving/python/examples/pipeline/sample_web_service sh get_data.sh python web_service.py >/dev/null & sleep 5 -- GitLab