diff --git a/core/predictor/CMakeLists.txt b/core/predictor/CMakeLists.txt index 1b9dc7b29845a2b8c7f958c1d8e836cb57e91d41..6b5013c3edadb4592df40db539fa75fb9364d02f 100644 --- a/core/predictor/CMakeLists.txt +++ b/core/predictor/CMakeLists.txt @@ -6,7 +6,7 @@ include(framework/CMakeLists.txt) include(tools/CMakeLists.txt) include(src/CMakeLists.txt) - +add_definitions(-D__STDC_FORMAT_MACROS) add_library(pdserving ${pdserving_srcs}) set_source_files_properties( ${pdserving_srcs} diff --git a/doc/COMPILE.md b/doc/COMPILE.md index 63a4d67c40af77e616e018d1a6dbc289615b4971..abb66084ac6f6c57c13c940eb10a87e2aba2daa2 100644 --- a/doc/COMPILE.md +++ b/doc/COMPILE.md @@ -61,6 +61,25 @@ pip install -r python/requirements.txt If Python3 is used, replace `pip` with `pip3`. +## GOPATH Setting + + +## Compile Arguments + +The default GOPATH is `$HOME/go`, which you can set to other values. +```shell +export GOPATH=$HOME/go +export PATH=$PATH:$GOPATH/bin +``` + +## Get go packages + +```shell +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger +go get -u github.com/golang/protobuf/protoc-gen-go +go get -u google.golang.org/grpc +``` ## Compile Server @@ -69,7 +88,10 @@ If Python3 is used, replace `pip` with `pip3`. ``` shell mkdir server-build-cpu && cd server-build-cpu -cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DSERVER=ON .. +cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ + -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \ + -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \ + -DSERVER=ON .. make -j10 ``` @@ -79,7 +101,11 @@ you can execute `make install` to put targets under directory `./output`, you ne ``` shell mkdir server-build-gpu && cd server-build-gpu -cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DSERVER=ON -DWITH_GPU=ON .. +cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ + -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \ + -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \ + -DSERVER=ON \ + -DWITH_GPU=ON .. make -j10 ``` @@ -93,7 +119,10 @@ execute `make install` to put targets under directory `./output` ``` shell mkdir client-build && cd client-build -cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python -DCLIENT=ON .. +cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ + -DPYTHON_LIBRARIES=$PYTHONROOT/lib/libpython2.7.so \ + -DPYTHON_EXECUTABLE=$PYTHONROOT/bin/python \ + -DCLIENT=ON .. make -j10 ``` diff --git a/doc/COMPILE_CN.md b/doc/COMPILE_CN.md index 29b0645cc4ea90c56cd5d691f4766a3e3ad39ba1..2ddaaf71f23b0199c7458d068139a6b7169c25d8 100644 --- a/doc/COMPILE_CN.md +++ b/doc/COMPILE_CN.md @@ -61,6 +61,22 @@ pip install -r python/requirements.txt 如果使用 Python3,请以 `pip3` 替换 `pip`。 +## GOPATH 设置 + +默认 GOPATH 设置为 `$HOME/go`,您也可以设置为其他值。 +```shell +export GOPATH=$HOME/go +export PATH=$PATH:$GOPATH/bin +``` + +## 获取 Go packages + +```shell +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway +go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger +go get -u github.com/golang/protobuf/protoc-gen-go +go get -u google.golang.org/grpc +``` ## 编译Server部分 diff --git a/doc/FAQ.md b/doc/FAQ.md index eb4f05a28594effcf59aac880cf4d81846a3a925..119c5a9dbc7237b5dadbddd79fbb4d2340940273 100644 --- a/doc/FAQ.md +++ b/doc/FAQ.md @@ -1,8 +1,8 @@ # FAQ -- Q:如何调整RPC服务的等待时间,避免超时? +- Q: 如何调整RPC服务的等待时间,避免超时? - A:使用set_rpc_timeout_ms设置更长的等待时间,单位为毫秒,默认时间为20秒。 + A: 使用set_rpc_timeout_ms设置更长的等待时间,单位为毫秒,默认时间为20秒。 示例: ``` @@ -15,4 +15,13 @@ ``` - Q: 如何使用自己编译的Paddle Serving进行预测? - A:通过pip命令安装自己编译出的whl包,并设置SERVING_BIN环境变量为编译出的serving二进制文件路径。 + A: 通过pip命令安装自己编译出的whl包,并设置SERVING_BIN环境变量为编译出的serving二进制文件路径。 + +- Q: 执行GPU预测时遇到InvalidArgumentError: Device id must be less than GPU count, but received id is: 0. GPU count is: 0. + A: 将显卡驱动对应的libcuda.so的目录添加到LD_LIBRARY_PATH环境变量中 + +- Q: 执行GPU预测时遇到ExternalError: Cudnn error, CUDNN_STATUS_BAD_PARAM at (/home/scmbuild/workspaces_cluster.dev/baidu.lib.paddlepaddle/baidu/lib/paddlepaddle/Paddle/paddle/fluid/operators/batch_norm_op.cu:198) + A: 将cudnn的lib64路径添加到LD_LIBRARY_PATH,安装自pypi的Paddle Serving中post9版使用的是cudnn 7.3,post10使用的是cudnn 7.5。如果是使用自己编译的Paddle Serving,可以在log/serving.INFO日志文件中查看对应的cudnn版本。 + +- Q: 执行GPU预测时遇到Error: Failed to find dynamic library: libcublas.so + A: 将cuda的lib64路径添加到LD_LIBRARY_PATH, post9版本的Paddle Serving使用的是cuda 9.0,post10版本使用的cuda 10.0。 diff --git a/doc/NEW_WEB_SERVICE.md b/doc/NEW_WEB_SERVICE.md index 39bca98a3bdfbc1b2cadb5d2c3d60395b4592b34..86e53b843eb18d28057f69a39934682d797e4de5 100644 --- a/doc/NEW_WEB_SERVICE.md +++ b/doc/NEW_WEB_SERVICE.md @@ -1,56 +1,152 @@ # How to develop a new Web service? + ([简体中文](NEW_WEB_SERVICE_CN.md)|English) -This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](../python/examples/imagenet/resnet50_web_service.py). +This document will take Uci service as an example to introduce how to develop a new Web Service. You can check out the complete code [here](../python/examples/pipeline/simple_web_service/web_service.py). -## WebService base class +## Op base class + +In some services, a single model may not meet business needs, requiring multiple models to be concatenated or parallel to complete the entire service. We call a single model operation Op and provide a simple set of interfaces to implement the complex logic of Op concatenation or parallelism. -Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows: +Data between Ops is passed as a dictionary, Op can be started as threads or process, and Op can be configured for the number of concurrencies, etc. + +Typically, you need to inherit the Op base class and override its `init_op`, `preprocess` and `postprocess` methods, which are implemented by default as follows: ```python -class WebService(object): - - def preprocess(self, feed={}, fetch=[]): - return feed, fetch - def postprocess(self, feed={}, fetch=[], fetch_map=None): - return fetch_map +class Op(object): + def init_op(self): + pass + def preprocess(self, input_dicts): + # multiple previous Op + if len(input_dicts) != 1: + _LOGGER.critical( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.") + os._exit(-1) + (_, input_dict), = input_dicts.items() + return input_dict + def postprocess(self, input_dicts, fetch_dict): + return fetch_dict ``` +### init_op + +This method is used to load user-defined resources such as dictionaries. A separator is loaded in the [UciOp](../python/examples/pipeline/simple_web_service/web_service.py). + +**Note**: If Op is launched in threaded mode, different threads of the same Op execute `init_op` only once and share `init_op` loaded resources when Op is multi-concurrent. + ### preprocess -The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`: +This method is used to preprocess the data before model prediction. It has an `input_dicts` parameter, `input_dicts` is a dictionary, key is the `name` of the previous Op, and value is the data transferred from the corresponding previous op (the data is also in dictionary format). -- The value of `feed` is the feed part `request.json["feed"]` in the request data -- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data +The `preprocess` method needs to process the data into a ndarray dictionary (key is the feed variable name, and value is the corresponding ndarray value). Op will take the return value as the input of the model prediction and pass the output to the `postprocess` method. -The return values are the feed and fetch values used in the prediction. +**Note**: if Op does not have a model configuration file, the return value of `preprocess` will be directly passed to `postprocess`. ### postprocess -The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`: +This method is used for data post-processing after model prediction. It has two parameters, `input_dicts` and `fetch_dict`. + +Where the `input_dicts` parameter is consistent with the parameter in `preprocess` method, and `fetch_dict` is the output of the model prediction (key is the name of the fetch variable, and value is the corresponding ndarray value). Op will take the return value of `postprocess` as the input of subsequent Op `preprocess`. -- The value of `feed` is the feed part `request.json["feed"]` in the request data -- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data -- The value of `fetch_map` is the model output value. +**Note**: if Op does not have a model configuration file, `fetch_dict` will be the return value of `preprocess`. -The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request. -## Develop ImageService class + +Here is the op of the UCI example: + +```python +class UciOp(Op): + def init_op(self): + self.separator = "," + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + x_value = input_dict["x"] + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) for x in x_value.split(self.separator)]) + return input_dict + + def postprocess(self, input_dicts, fetch_dict): + fetch_dict["price"] = str(fetch_dict["price"][0][0]) + return fetch_dict +``` + + + +## WebService base class + +Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `get_pipeline_response` method to define the topological relationship between Ops. The default implementation is as follows: ```python -class ImageService(WebService): - - def preprocess(self, feed={}, fetch=[]): - reader = ImageReader() - feed_batch = [] - for ins in feed: - if "image" not in ins: - raise ("feed data error!") - sample = base64.b64decode(ins["image"]) - img = reader.process_image(sample) - feed_batch.append({"image": img}) - return feed_batch, fetch +class WebService(object): + def get_pipeline_response(self, read_op): + return None +``` + +Where `read_op` serves as the entry point of the topology map of the whole service (that is, the first op defined by the user is followed by `read_op`). + +For single Op service (single model), take Uci service as an example (there is only one Uci prediction model in the whole service): + +```python +class UciService(WebService): + def get_pipeline_response(self, read_op): + uci_op = UciOp(name="uci", input_ops=[read_op]) + return uci_op +``` + +For multiple Op services (multiple models), take Ocr service as an example (the whole service is completed in series by Det model and Rec model): + +```python +class OcrService(WebService): + def get_pipeline_response(self, read_op): + det_op = DetOp(name="det", input_ops=[read_op]) + rec_op = RecOp(name="rec", input_ops=[det_op]) + return rec_op +``` + + + +WebService objects need to load a yaml configuration file through the `prepare_pipeline_config` to configure each Op and the entire service. The simplest configuration file is as follows (Uci example): + +```yaml +http_port: 18080 +op: + uci: + local_service_conf: + model_config: uci_housing_model # path +``` + +All field names of yaml file are as follows: + +```yaml +rpc_port: 18080 # gRPC port +build_dag_each_worker: false # Whether to use process server or not. The default is false +worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 +http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0. +dag: + is_thread_op: true # Whether to use the thread version of OP. The default is true + client_type: brpc # Use brpc or grpc client. The default is brpc + retry: 1 # The number of times DAG executor retries after failure. The default value is 1, that is, no retrying + use_profile: false # Whether to print the log on the server side. The default is false + tracer: + interval_s: -1 # Monitoring time interval of Tracer (in seconds). Do not start monitoring when the value is less than 1. The default value is -1 +op: + : # op name, corresponding to the one defined in the program + concurrency: 1 # op concurrency number, the default is 1 + timeout: -1 # predict timeout in milliseconds. The default value is -1, that is, no timeout + retry: 1 # timeout retransmissions. The default value is 1, that is, do not try again + batch_size: 1 # If this field is set, Op will merge multiple request outputs into a single batch + auto_batching_timeout: -1 # auto-batching timeout in milliseconds. The default value is -1, that is, no timeout + local_service_conf: + model_config: # the path of the corresponding model file. There is no default value(None). If this item is not configured, the model file will not be loaded. + workdir: "" # working directory of corresponding model + thread_num: 2 # the corresponding model is started with thread_num threads + devices: "" # on which device does the model launched. You can specify the GPU card number(such as "0,1,2"), which is CPU by default + mem_optim: true # mem optimization option, the default is true + ir_optim: false # ir optimization option, the default is false ``` -For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction. +All fields of Op can be defined when Op is created in the program (which will override yaml fields). diff --git a/doc/NEW_WEB_SERVICE_CN.md b/doc/NEW_WEB_SERVICE_CN.md index 43ca7fb61f2c70f13019574a7984e3665bd1b6fa..af6730a89badd8214323ea08bbb799033f57f09b 100644 --- a/doc/NEW_WEB_SERVICE_CN.md +++ b/doc/NEW_WEB_SERVICE_CN.md @@ -1,56 +1,152 @@ # 如何开发一个新的Web Service? + (简体中文|[English](NEW_WEB_SERVICE.md)) -本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/imagenet/resnet50_web_service.py)查阅完整的代码。 +本文档将以 Uci 房价预测服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/pipeline/simple_web_service/web_service.py)查阅完整的代码。 + +## Op 基类 + +在一些服务中,单个模型可能无法满足需求,需要多个模型串联或并联来完成整个服务。我们将单个模型操作称为 Op,并提供了一套简单的接口来实现 Op 串联或并联的复杂逻辑。 -## WebService基类 +Op 间数据是以字典形式进行传递的,Op 可以以线程或进程方式启动,同时可以对 Op 的并发数等进行配置。 -Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下: +通常情况下,您需要继承 Op 基类,重写它的 `init_op`、`preprocess` 和 `postprocess` 方法,默认实现如下: ```python -class WebService(object): - - def preprocess(self, feed={}, fetch=[]): - return feed, fetch - def postprocess(self, feed={}, fetch=[], fetch_map=None): - return fetch_map +class Op(object): + def init_op(self): + pass + def preprocess(self, input_dicts): + # multiple previous Op + if len(input_dicts) != 1: + _LOGGER.critical( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.") + os._exit(-1) + (_, input_dict), = input_dicts.items() + return input_dict + def postprocess(self, input_dicts, fetch_dict): + return fetch_dict ``` -### preprocess方法 +### init_op 方法 + +该方法用于加载用户自定义资源(如字典等),在 [UciOp](../python/examples/pipeline/simple_web_service/web_service.py) 中加载了一个分隔符。 + +**注意**:如果 Op 是以线程模式加载的,那么在 Op 多并发时,同种 Op 的不同线程只执行一次 `init_op`,且共用 `init_op` 加载的资源。 + +### preprocess 方法 + +该方法用于模型预测前对数据的预处理,它有一个 `input_dicts` 参数,`input_dicts` 是一个字典,key 为前继 Op 的 `name`,value 为对应前继 Op 传递过来的数据(数据同样是字典格式)。 + +`preprocess` 方法需要将数据处理成 ndarray 字典(key 为 feed 变量名,value 为对应的 ndarray 值),Op 会将该返回值作为模型预测的输入,并将输出传递给 `postprocess` 方法。 -preprocess方法有两个输入参数,`feed`和`fetch`。对于一个HTTP请求`request`: +**注意**:如果 Op 没有配置模型,则 `preprocess` 的返回值会直接传递给 `postprocess`。 -- `feed`的值为请求数据中的feed部分`request.json["feed"]` -- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` +### postprocess 方法 -返回值分别是预测过程中用到的feed和fetch值。 +该方法用于模型预测后对数据的后处理,它有两个参数,`input_dicts` 和 `fetch_dict`。 -### postprocess方法 +其中,`input_dicts` 与 `preprocess` 的参数相同,`fetch_dict` 为模型预测的输出(key 为 fetch 变量名,value 为对应的 ndarray 值)。Op 会将 `postprocess` 的返回值作为后继 Op `preprocess` 的输入。 -postprocess方法有三个输入参数,`feed`、`fetch`和`fetch_map`: +**注意**:如果 Op 没有配置模型,则 `fetch_dict` 将为 `preprocess` 的返回值。 -- `feed`的值为请求数据中的feed部分`request.json["feed"]` -- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` -- `fetch_map`的值为fetch到的模型输出值 -返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。 -## 开发ImageService类 +下面是 Uci 例子的 Op: ```python -class ImageService(WebService): - - def preprocess(self, feed={}, fetch=[]): - reader = ImageReader() - feed_batch = [] - for ins in feed: - if "image" not in ins: - raise ("feed data error!") - sample = base64.b64decode(ins["image"]) - img = reader.process_image(sample) - feed_batch.append({"image": img}) - return feed_batch, fetch +class UciOp(Op): + def init_op(self): + self.separator = "," + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + x_value = input_dict["x"] + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) for x in x_value.split(self.separator)]) + return input_dict + + def postprocess(self, input_dicts, fetch_dict): + fetch_dict["price"] = str(fetch_dict["price"][0][0]) + return fetch_dict +``` + + + +## WebService 基类 + +Paddle Serving 实现了 [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L28) 基类,您需要重写它的 `get_pipeline_response` 方法来定义 Op 间的拓扑关系,并返回作为 Response 的 Op,默认实现如下: + +```python +class WebService(object): + def get_pipeline_response(self, read_op): + return None +``` + +其中,`read_op` 作为整个服务拓扑图的入口(即用户自定义的第一个 Op 的前继为 `read_op`)。 + +对于单 Op 服务(单模型),以 Uci 服务为例(整个服务中只有一个 Uci 房价预测模型): + +```python +class UciService(WebService): + def get_pipeline_response(self, read_op): + uci_op = UciOp(name="uci", input_ops=[read_op]) + return uci_op +``` + +对于多 Op 服务(多模型),以 Ocr 服务为例(整个服务由 Det 模型和 Rec 模型串联完成): + +```python +class OcrService(WebService): + def get_pipeline_response(self, read_op): + det_op = DetOp(name="det", input_ops=[read_op]) + rec_op = RecOp(name="rec", input_ops=[det_op]) + return rec_op +``` + + + +WebService 对象需要通过 `prepare_pipeline_config` 加载一个 yaml 配置文件,用来对各个 Op 以及整个服务进行配置,最简单的配置文件如下(Uci 例子): + +```yaml +http_port: 18080 +op: + uci: + local_service_conf: + model_config: uci_housing_model # 路径 +``` + +yaml 文件的所有字段名详见下面: + +```yaml +rpc_port: 18080 # gRPC端口号 +build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false +worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 +http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 +dag: + is_thread_op: true # 是否使用线程版Op,默认为 true + client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc + retry: 1 # DAG Executor 在失败后重试次数,默认为 1,即不重试 + use_profile: false # 是否在 Server 端打印日志,默认为 false + tracer: + interval_s: -1 # Tracer 监控的时间间隔,单位为秒。当该值小于 1 时不启动监控,默认为 -1 +op: + : # op 名,与程序中定义的相对应 + concurrency: 1 # op 并发数,默认为 1 + timeout: -1 # 预测超时时间,单位为毫秒。默认为 -1 即不超时 + retry: 1 # 超时重发次数。默认为 1 即不重试 + batch_size: 1 # auto-batching 中的 batch_size,若设置该字段则 Op 会将多个请求输出合并为一个 batch + auto_batching_timeout: -1 # auto-batching 超时时间,单位为毫秒。默认为 -1 即不超时 + local_service_conf: + model_config: # 对应模型文件的路径,无默认值(None)。若不配置该项则不会加载模型文件。 + workdir: "" # 对应模型的工作目录 + thread_num: 2 # 对应模型用几个线程启动 + devices: "" # 模型启动在哪个设备上,可以指定 gpu 卡号(如 "0,1,2"),默认为 cpu + mem_optim: true # mem 优化选项,默认为 true + ir_optim: false # ir 优化选项,默认为 false ``` -对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。 +其中,Op 的所有字段均可以在程序中创建 Op 时定义(会覆盖 yaml 的字段)。 diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index 7270cc134558906f6989a2c315a1dd4e2a640c59..4205aa15723d3625c0fea43eb9d0fd67f32f4a3f 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -251,9 +251,10 @@ server.run_server() Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: ```yaml -port: 18080 # gRPC port +rpc_port: 18080 # gRPC port worker_num: 1 # gRPC thread pool size (the number of processes in the process version servicer). The default is 1 build_dag_each_worker: false # Whether to use process server or not. The default is false +http_port: 0 # HTTP service port. Do not start HTTP service when the value is less or equals 0. The default value is 0. dag: is_thread_op: true # Whether to use the thread version of OP. The default is true client_type: brpc # Use brpc or grpc client. The default is brpc @@ -285,6 +286,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & ``` +PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`. + ### Start PipelineServer Run the following code @@ -384,7 +387,7 @@ for f in futures: -## How to optimize through the timeline tool +## How to optimize with the timeline tool In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service. diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index 3214487c31bcc47ec67d2ad28d987bff845fa13b..7cab409b2b8ca5d80eac05827f2e3fb774000998 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -249,9 +249,10 @@ server.run_server() 其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: ```yaml -port: 18080 # gRPC端口号 +rpc_port: 18080 # gRPC端口号 worker_num: 1 # gRPC线程池大小(进程版 Servicer 中为进程数),默认为 1 build_dag_each_worker: false # 是否使用进程版 Servicer,默认为 false +http_port: 0 # HTTP 服务的端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 dag: is_thread_op: true # 是否使用线程版Op,默认为 true client_type: brpc # 使用 brpc 或 grpc client,默认为 brpc @@ -283,6 +284,8 @@ python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn. python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log & ``` +PipelineServing 也支持本地自动启动 PaddleServingService,请参考 `python/examples/pipeline/ocr` 下的例子。 + ### 启动 PipelineServer 运行下面代码 diff --git a/doc/SAVE.md b/doc/SAVE.md index 54800fa06ab4b8c20c0ffe75d417e1b42ab6ebe6..8ebeb89c536f576bf73414fb06c1eb4bfde63ea0 100644 --- a/doc/SAVE.md +++ b/doc/SAVE.md @@ -38,12 +38,15 @@ If you have saved model files using Paddle's `save_inference_model` API, you can import paddle_serving_client.io as serving_io serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None ) ``` -dirname (str) - Path of saved model files. Program file and parameter files are saved in this directory. - -serving_server (str, optional) - The path of model files and configuration files for server. Default: "serving_server". - -serving_client (str, optional) - The path of configuration files for client. Default: "serving_client". - -model_filename (str, optional) - The name of file to load the inference program. If it is None, the default filename `__model__` will be used. Default: None. - -paras_filename (str, optional) - The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. Default: None. +Or you can use a build-in python module called `paddle_serving_client.convert` to convert it. +```python +python -m paddle_serving_client.convert --dirname ./your_inference_model_dir +``` +Arguments are the same as `inference_model_to_serving` API. +| Argument | Type | Default | Description | +|--------------|------|-----------|--------------------------------| +| `dirname` | str | - | Path of saved model files. Program file and parameter files are saved in this directory. | +| `serving_server` | str | `"serving_server"` | The path of model files and configuration files for server. | +| `serving_client` | str | `"serving_client"` | The path of configuration files for client. | +| `model_filename` | str | None | The name of file to load the inference program. If it is None, the default filename `__model__` will be used. | +| `paras_filename` | str | None | The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. | diff --git a/doc/SAVE_CN.md b/doc/SAVE_CN.md index aaf0647fd1c4e95584bb7aa42a6671620adeb6d0..a05729ed9c01f421893403b4fc2a13bd42ad9fd4 100644 --- a/doc/SAVE_CN.md +++ b/doc/SAVE_CN.md @@ -39,12 +39,15 @@ for line in sys.stdin: import paddle_serving_client.io as serving_io serving_io.inference_model_to_serving(dirname, serving_server="serving_server", serving_client="serving_client", model_filename=None, params_filename=None) ``` -dirname (str) – 需要转换的模型文件存储路径,Program结构文件和参数文件均保存在此目录。 - -serving_server (str, 可选) - 转换后的模型文件和配置文件的存储路径。默认值为serving_server。 - -serving_client (str, 可选) - 转换后的客户端配置文件存储路径。默认值为serving_client。 - -model_filename (str,可选) – 存储需要转换的模型Inference Program结构的文件名称。如果设置为None,则使用 `__model__` 作为默认的文件名。默认值为None。 - -params_filename (str,可选) – 存储需要转换的模型所有参数的文件名称。当且仅当所有模型参数被保存在一个单独的二进制文件中,它才需要被指定。如果模型参数是存储在各自分离的文件中,设置它的值为None。默认值为None。 +或者你可以使用Paddle Serving提供的名为`paddle_serving_client.convert`的内置模块进行转换。 +```python +python -m paddle_serving_client.convert --dirname ./your_inference_model_dir +``` +模块参数与`inference_model_to_serving`接口参数相同。 +| 参数 | 类型 | 默认值 | 描述 | +|--------------|------|-----------|--------------------------------| +| `dirname` | str | - | 需要转换的模型文件存储路径,Program结构文件和参数文件均保存在此目录。| +| `serving_server` | str | `"serving_server"` | 转换后的模型文件和配置文件的存储路径。默认值为serving_server | +| `serving_client` | str | `"serving_client"` | 转换后的客户端配置文件存储路径。默认值为serving_client | +| `model_filename` | str | None | 存储需要转换的模型Inference Program结构的文件名称。如果设置为None,则使用 `__model__` 作为默认的文件名 | +| `paras_filename` | str | None | 存储需要转换的模型所有参数的文件名称。当且仅当所有模型参数被保存在一个单独的二进制文件中,它才需要被指定。如果模型参数是存储在各自分离的文件中,设置它的值为None | diff --git a/doc/deprecated/NEW_WEB_SERVICE.md b/doc/deprecated/NEW_WEB_SERVICE.md new file mode 100644 index 0000000000000000000000000000000000000000..39bca98a3bdfbc1b2cadb5d2c3d60395b4592b34 --- /dev/null +++ b/doc/deprecated/NEW_WEB_SERVICE.md @@ -0,0 +1,56 @@ +# How to develop a new Web service? + +([简体中文](NEW_WEB_SERVICE_CN.md)|English) + +This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](../python/examples/imagenet/resnet50_web_service.py). + +## WebService base class + +Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows: + +```python +class WebService(object): + + def preprocess(self, feed={}, fetch=[]): + return feed, fetch + def postprocess(self, feed={}, fetch=[], fetch_map=None): + return fetch_map +``` + +### preprocess + +The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`: + +- The value of `feed` is the feed part `request.json["feed"]` in the request data +- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data + +The return values are the feed and fetch values used in the prediction. + +### postprocess + +The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`: + +- The value of `feed` is the feed part `request.json["feed"]` in the request data +- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data +- The value of `fetch_map` is the model output value. + +The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request. + +## Develop ImageService class + +```python +class ImageService(WebService): + + def preprocess(self, feed={}, fetch=[]): + reader = ImageReader() + feed_batch = [] + for ins in feed: + if "image" not in ins: + raise ("feed data error!") + sample = base64.b64decode(ins["image"]) + img = reader.process_image(sample) + feed_batch.append({"image": img}) + return feed_batch, fetch +``` + +For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction. diff --git a/doc/deprecated/NEW_WEB_SERVICE_CN.md b/doc/deprecated/NEW_WEB_SERVICE_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..43ca7fb61f2c70f13019574a7984e3665bd1b6fa --- /dev/null +++ b/doc/deprecated/NEW_WEB_SERVICE_CN.md @@ -0,0 +1,56 @@ +# 如何开发一个新的Web Service? + +(简体中文|[English](NEW_WEB_SERVICE.md)) + +本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](../python/examples/imagenet/resnet50_web_service.py)查阅完整的代码。 + +## WebService基类 + +Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下: + +```python +class WebService(object): + + def preprocess(self, feed={}, fetch=[]): + return feed, fetch + def postprocess(self, feed={}, fetch=[], fetch_map=None): + return fetch_map +``` + +### preprocess方法 + +preprocess方法有两个输入参数,`feed`和`fetch`。对于一个HTTP请求`request`: + +- `feed`的值为请求数据中的feed部分`request.json["feed"]` +- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` + +返回值分别是预测过程中用到的feed和fetch值。 + +### postprocess方法 + +postprocess方法有三个输入参数,`feed`、`fetch`和`fetch_map`: + +- `feed`的值为请求数据中的feed部分`request.json["feed"]` +- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` +- `fetch_map`的值为fetch到的模型输出值 + +返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。 + +## 开发ImageService类 + +```python +class ImageService(WebService): + + def preprocess(self, feed={}, fetch=[]): + reader = ImageReader() + feed_batch = [] + for ins in feed: + if "image" not in ins: + raise ("feed data error!") + sample = base64.b64decode(ins["image"]) + img = reader.process_image(sample) + feed_batch.append({"image": img}) + return feed_batch, fetch +``` + +对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。 diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index edec41573b67f50feca52ee017bae2d7fa2b28ac..4b20cb2001ebb595601f22fa6e4aab8dd5df18f4 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -1,7 +1,5 @@ if (CLIENT) file(INSTALL pipeline DESTINATION paddle_serving_client) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_client/pipeline/proto) file(GLOB_RECURSE SERVING_CLIENT_PY_FILES paddle_serving_client/*.py) set(PY_FILES ${SERVING_CLIENT_PY_FILES}) SET(PACKAGE_NAME "serving_client") @@ -11,13 +9,9 @@ endif() if (SERVER) if (NOT WITH_GPU) file(INSTALL pipeline DESTINATION paddle_serving_server) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server/pipeline/proto) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) else() file(INSTALL pipeline DESTINATION paddle_serving_server_gpu) - execute_process(COMMAND ${PYTHON_EXECUTABLE} run_codegen.py - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/paddle_serving_server_gpu/pipeline/proto) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py) endif() set(PY_FILES ${SERVING_SERVER_PY_FILES}) @@ -25,6 +19,8 @@ if (SERVER) set(SETUP_LOG_FILE "setup.py.server.log") endif() +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/util.py + ${CMAKE_CURRENT_BINARY_DIR}/util.py) if (CLIENT) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.client.in ${CMAKE_CURRENT_BINARY_DIR}/setup.py) @@ -47,6 +43,9 @@ if (SERVER) endif() endif() +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/gen_version.py + ${CMAKE_CURRENT_BINARY_DIR}/gen_version.py) + set (SERVING_CLIENT_CORE ${PADDLE_SERVING_BINARY_DIR}/core/general-client/*.so) message("python env: " ${py_env}) @@ -54,6 +53,7 @@ if (APP) add_custom_command( OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_app/ ${PADDLE_SERVING_BINARY_DIR}/python/ + COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "app" COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel DEPENDS ${SERVING_APP_CORE} general_model_config_py_proto ${PY_FILES}) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) @@ -65,6 +65,7 @@ add_custom_command( COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_client/ ${PADDLE_SERVING_BINARY_DIR}/python/ COMMAND ${CMAKE_COMMAND} -E copy ${SERVING_CLIENT_CORE} ${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/serving_client.so COMMAND env ${py_env} ${PYTHON_EXECUTABLE} python_tag.py + COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "client" COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel DEPENDS ${SERVING_CLIENT_CORE} sdk_configure_py_proto ${PY_FILES}) add_custom_target(paddle_python ALL DEPENDS serving_client ${PADDLE_SERVING_BINARY_DIR}/.timestamp) @@ -75,6 +76,7 @@ if (SERVER) add_custom_command( OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server/ ${PADDLE_SERVING_BINARY_DIR}/python/ + COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py "server" COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES}) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) @@ -83,7 +85,8 @@ if (SERVER) OUTPUT ${PADDLE_SERVING_BINARY_DIR}/.timestamp COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_serving_server_gpu/ ${PADDLE_SERVING_BINARY_DIR}/python/ - COMMAND env ${py_env} ${PYTHON_EXECUTABLE} paddle_serving_server_gpu/gen_cuda_version.py ${CUDA_VERSION_MAJOR} + COMMAND env ${py_env} ${PYTHON_EXECUTABLE} gen_version.py + "server_gpu" ${CUDA_VERSION_MAJOR} COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel DEPENDS ${SERVING_SERVER_CORE} server_config_py_proto ${PY_FILES}) add_custom_target(paddle_python ALL DEPENDS ${PADDLE_SERVING_BINARY_DIR}/.timestamp) diff --git a/python/examples/pipeline/imdb_model_ensemble/README_CN.md b/python/examples/pipeline/imdb_model_ensemble/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..88eeab70c470268775ad22fd65a6d1b999a6b167 --- /dev/null +++ b/python/examples/pipeline/imdb_model_ensemble/README_CN.md @@ -0,0 +1,24 @@ +# IMDB model ensemble 样例 + +## 获取模型 +``` +sh get_data.sh +``` + +## 启动服务 + +``` +python -m paddle_serving_server_gpu.serve --model imdb_cnn_model --port 9292 &> cnn.log & +python -m paddle_serving_server_gpu.serve --model imdb_bow_model --port 9393 &> bow.log & +python test_pipeline_server.py &>pipeline.log & +``` + +## 启动客户端 +``` +python test_pipeline_client.py +``` + +## HTTP 测试 +``` +curl -X POST -k http://localhost:9999/prediction -d '{"key": ["words"], "value": ["i am very sad | 0"]}' +``` diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index 3f0b1bb8d4eedb073fa5014eb20e1a170f0d811b..3447ffd449de59ea76450e95c7f355413d1a12ac 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -1,6 +1,7 @@ -port: 18080 +rpc_port: 18085 worker_num: 4 build_dag_each_worker: false +http_port: 9999 dag: is_thread_op: false client_type: brpc diff --git a/python/examples/pipeline/ocr/README.md b/python/examples/pipeline/ocr/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f51789fc5e419d715141ba59dc49011d4f306e56 --- /dev/null +++ b/python/examples/pipeline/ocr/README.md @@ -0,0 +1,67 @@ +# OCR Pipeline WebService + +(English|[简体中文](./README_CN.md)) + +This document will take OCR as an example to show how to use Pipeline WebService to start multi-model tandem services. + +## Get Model +``` +python -m paddle_serving_app.package --get_model ocr_rec +tar -xzvf ocr_rec.tar.gz +python -m paddle_serving_app.package --get_model ocr_det +tar -xzvf ocr_det.tar.gz +``` + +## Get Dataset (Optional) +``` +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar +tar xf test_imgs.tar +``` + +## Start Service +``` +python web_service.py &>log.txt & +``` + +## Test +``` +python pipeline_http_client.py +``` + + + + diff --git a/python/examples/pipeline/ocr/README_CN.md b/python/examples/pipeline/ocr/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..ba1150d32e16298d0c1267d46f7d6e804b53d041 --- /dev/null +++ b/python/examples/pipeline/ocr/README_CN.md @@ -0,0 +1,67 @@ +# OCR Pipeline WebService + +([English](./README.md)|简体中文) + +本文档将以 OCR 为例,介绍如何使用 Pipeline WebService 启动多模型串联的服务。 + +## 获取模型 +``` +python -m paddle_serving_app.package --get_model ocr_rec +tar -xzvf ocr_rec.tar.gz +python -m paddle_serving_app.package --get_model ocr_det +tar -xzvf ocr_det.tar.gz +``` + +## 获取数据集(可选) +``` +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/ocr/test_imgs.tar +tar xf test_imgs.tar +``` + +## 启动 WebService +``` +python web_service.py &>log.txt & +``` + +## 测试 +``` +python pipeline_http_client.py +``` + + diff --git a/python/examples/pipeline/ocr/config.yml b/python/examples/pipeline/ocr/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..48addccfd0e543e04adf6587c5532b2a18bb2810 --- /dev/null +++ b/python/examples/pipeline/ocr/config.yml @@ -0,0 +1,22 @@ +rpc_port: 18080 +worker_num: 4 +build_dag_each_worker: false +http_port: 9999 +dag: + is_thread_op: false + client_type: brpc + retry: 1 + use_profile: false +op: + det: + concurrency: 2 + local_service_conf: + model_config: ocr_det_model + devices: "0" + rec: + concurrency: 1 + timeout: -1 + retry: 1 + local_service_conf: + model_config: ocr_rec_model + devices: "0" diff --git a/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..1eea9c3b36f74d04c74618a2012810a1a58d411e --- /dev/null +++ b/python/examples/pipeline/ocr/hybrid_service_pipeline_server.py @@ -0,0 +1,135 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_det_model", + workdir="det_workdir", # defalut: "workdir" + thread_num=2, # defalut: 2 + devices="0", # gpu0. defalut: "" (cpu) + mem_optim=True, # defalut: True + ir_optim=False, # defalut: False + available_port_generator=None), # defalut: None + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + server_endpoints=["127.0.0.1:12001"], + fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"], + client_config="ocr_rec_client/serving_client_conf.prototxt", + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/imgs/1.jpg b/python/examples/pipeline/ocr/imgs/1.jpg new file mode 100644 index 0000000000000000000000000000000000000000..08010177fed2ee8c3709912c06c0b161ba546313 Binary files /dev/null and b/python/examples/pipeline/ocr/imgs/1.jpg differ diff --git a/python/examples/pipeline/ocr/local_service_pipeline_server.py b/python/examples/pipeline/ocr/local_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..ccbd3b1b07a30422583812b659e1c249b37bcb9e --- /dev/null +++ b/python/examples/pipeline/ocr/local_service_pipeline_server.py @@ -0,0 +1,134 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +from paddle_serving_server_gpu.pipeline import LocalRpcServiceHandler +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_det_model", + workdir="det_workdir", # defalut: "workdir" + thread_num=2, # defalut: 2 + devices="0", # gpu0. defalut: "" (cpu) + mem_optim=True, # defalut: True + ir_optim=False, # defalut: False + available_port_generator=None), # defalut: None + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + local_rpc_service_handler=LocalRpcServiceHandler( + model_config="ocr_rec_model"), + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/pipeline_http_client.py b/python/examples/pipeline/ocr/pipeline_http_client.py new file mode 100644 index 0000000000000000000000000000000000000000..6d40e6474d6e0e32ac36835de3b69f4f90b6171d --- /dev/null +++ b/python/examples/pipeline/ocr/pipeline_http_client.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from paddle_serving_server_gpu.pipeline import PipelineClient +import numpy as np +import requests +import json +import cv2 +import base64 +import os + + +def cv2_to_base64(image): + return base64.b64encode(image).decode('utf8') + + +url = "http://127.0.0.1:9999/ocr/prediction" +test_img_dir = "imgs/" +for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data1 = file.read() + image = cv2_to_base64(image_data1) + +for i in range(4): + data = {"key": ["image"], "value": [image]} + r = requests.post(url=url, data=json.dumps(data)) + print(r.json()) diff --git a/python/examples/pipeline/ocr/pipeline_rpc_client.py b/python/examples/pipeline/ocr/pipeline_rpc_client.py new file mode 100644 index 0000000000000000000000000000000000000000..93524c36cb300e71bcde57f930cebc62e3d86cba --- /dev/null +++ b/python/examples/pipeline/ocr/pipeline_rpc_client.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from paddle_serving_server_gpu.pipeline import PipelineClient +import numpy as np +import requests +import json +import cv2 +import base64 +import os + +client = PipelineClient() +client.connect(['127.0.0.1:18080']) + + +def cv2_to_base64(image): + return base64.b64encode(image).decode('utf8') + + +test_img_dir = "imgs/" +for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data = file.read() + image = cv2_to_base64(image_data) + +for i in range(4): + ret = client.predict(feed_dict={"image": image}, fetch=["res"]) + print(ret) diff --git a/python/examples/pipeline/ocr/remote_service_pipeline_server.py b/python/examples/pipeline/ocr/remote_service_pipeline_server.py new file mode 100644 index 0000000000000000000000000000000000000000..170e6dd9c4687e10bb4af6278f2f5b0c9ac09878 --- /dev/null +++ b/python/examples/pipeline/ocr/remote_service_pipeline_server.py @@ -0,0 +1,129 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_server_gpu.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server_gpu.pipeline import PipelineServer +from paddle_serving_server_gpu.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server_gpu.pipeline.channel import ChannelDataEcode +import numpy as np +import cv2 +import time +import base64 +import json +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes +import time +import re +import base64 +import logging + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +read_op = RequestOp() +det_op = DetOp( + name="det", + input_ops=[read_op], + server_endpoints=["127.0.0.1:12000"], + fetch_list=["concat_1.tmp_0"], + client_config="ocr_det_client/serving_client_conf.prototxt", + concurrency=1) +rec_op = RecOp( + name="rec", + input_ops=[det_op], + server_endpoints=["127.0.0.1:12001"], + fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"], + client_config="ocr_rec_client/serving_client_conf.prototxt", + concurrency=1) +response_op = ResponseOp(input_ops=[rec_op]) + +server = PipelineServer("ocr") +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py new file mode 100644 index 0000000000000000000000000000000000000000..d1e6ec808343d62cc7c85b2d78ac1caa57c8cf28 --- /dev/null +++ b/python/examples/pipeline/ocr/web_service.py @@ -0,0 +1,112 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np +import cv2 +import base64 +from paddle_serving_app.reader import OCRReader +from paddle_serving_app.reader import Sequential, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes, GetRotateCropImage, SortedBoxes + +_LOGGER = logging.getLogger() + + +class DetOp(Op): + def init_op(self): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.filter_func = FilterBoxes(10, 10) + self.post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + data = base64.b64decode(input_dict["image"].encode('utf8')) + data = np.fromstring(data, np.uint8) + # Note: class variables(self.var) can only be used in process op mode + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + return {"image": det_img} + + def postprocess(self, input_dicts, fetch_dict): + det_out = fetch_dict["concat_1.tmp_0"] + ratio_list = [ + float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w + ] + dt_boxes_list = self.post_func(det_out, [ratio_list]) + dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) + out_dict = {"dt_boxes": dt_boxes, "image": self.im} + return out_dict + + +class RecOp(Op): + def init_op(self): + self.ocr_reader = OCRReader() + self.get_rotate_crop_image = GetRotateCropImage() + self.sorted_boxes = SortedBoxes() + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + im = input_dict["image"] + dt_boxes = input_dict["dt_boxes"] + dt_boxes = self.sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = self.get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + return feed_list + + def postprocess(self, input_dicts, fetch_dict): + rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + res = {"res": str(res_lst)} + return res + + +class OcrService(WebService): + def get_pipeline_response(self, read_op): + det_op = DetOp(name="det", input_ops=[read_op]) + rec_op = RecOp(name="rec", input_ops=[det_op]) + return rec_op + + +uci_service = OcrService(name="ocr") +uci_service.prepare_pipeline_config("config.yml") +uci_service.run_service() diff --git a/python/examples/pipeline/simple_web_service/README.md b/python/examples/pipeline/simple_web_service/README.md new file mode 100644 index 0000000000000000000000000000000000000000..049fbf2ec69bb83062f396e59344e29b0094372a --- /dev/null +++ b/python/examples/pipeline/simple_web_service/README.md @@ -0,0 +1,19 @@ +# Simple Pipeline WebService + +This document will takes UCI service as an example to introduce how to use Pipeline WebService. + +## Get model +``` +sh get_data.sh +``` + +## Start server + +``` +python web_service.py &>log.txt & +``` + +## Http test +``` +curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' +``` diff --git a/python/examples/pipeline/simple_web_service/README_CN.md b/python/examples/pipeline/simple_web_service/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..c08d642f7c8034e9d326a24636728bff36f8638b --- /dev/null +++ b/python/examples/pipeline/simple_web_service/README_CN.md @@ -0,0 +1,19 @@ +# Simple Pipeline WebService + +这里以 Uci 服务为例来介绍 Pipeline WebService 的使用。 + +## 获取模型 +``` +sh get_data.sh +``` + +## 启动服务 + +``` +python web_service.py &>log.txt & +``` + +## 测试 +``` +curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' +``` diff --git a/python/examples/pipeline/simple_web_service/config.yml b/python/examples/pipeline/simple_web_service/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..72e473e320e792b8fafc46768c8ef38e7a00436c --- /dev/null +++ b/python/examples/pipeline/simple_web_service/config.yml @@ -0,0 +1,9 @@ +worker_num: 4 +http_port: 18080 +dag: + is_thread_op: false +op: + uci: + local_service_conf: + model_config: uci_housing_model + devices: "" # "0,1" diff --git a/python/examples/pipeline/simple_web_service/get_data.sh b/python/examples/pipeline/simple_web_service/get_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..84a3966a0ef323cef4b146d8e9489c70a7a8ae35 --- /dev/null +++ b/python/examples/pipeline/simple_web_service/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz +tar -xzf uci_housing.tar.gz diff --git a/python/examples/pipeline/simple_web_service/web_service.py b/python/examples/pipeline/simple_web_service/web_service.py new file mode 100644 index 0000000000000000000000000000000000000000..28197e804ffc08d094d0e33d3d2654ace3093ded --- /dev/null +++ b/python/examples/pipeline/simple_web_service/web_service.py @@ -0,0 +1,51 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np + +_LOGGER = logging.getLogger() + + +class UciOp(Op): + def init_op(self): + self.separator = "," + + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + _LOGGER.info(input_dict) + x_value = input_dict["x"] + if isinstance(x_value, (str, unicode)): + input_dict["x"] = np.array( + [float(x.strip()) for x in x_value.split(self.separator)]) + return input_dict + + def postprocess(self, input_dicts, fetch_dict): + # _LOGGER.info(fetch_dict) + fetch_dict["price"] = str(fetch_dict["price"][0][0]) + return fetch_dict + + +class UciService(WebService): + def get_pipeline_response(self, read_op): + uci_op = UciOp(name="uci", input_ops=[read_op]) + return uci_op + + +uci_service = UciService(name="uci") +uci_service.prepare_pipeline_config("config.yml") +uci_service.run_service() diff --git a/python/examples/yolov4/test_client.py b/python/examples/yolov4/test_client.py index 92dcd06552ca1fdd3f2d54060e9de501f052e349..2616e55766192fca676e58efc4f0a2a3d634f1d3 100644 --- a/python/examples/yolov4/test_client.py +++ b/python/examples/yolov4/test_client.py @@ -30,7 +30,6 @@ client.load_client_config("yolov4_client/serving_client_conf.prototxt") client.connect(['127.0.0.1:9393']) im = preprocess(sys.argv[1]) -print(im.shape) fetch_map = client.predict( feed={ "image": im, diff --git a/python/gen_version.py b/python/gen_version.py new file mode 100644 index 0000000000000000000000000000000000000000..258905f5815f6af01398479732b907c80cb9d739 --- /dev/null +++ b/python/gen_version.py @@ -0,0 +1,43 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import re +import os +import subprocess + + +def update_info(file_name, feature, info): + new_str = "" + with open(file_name, "r") as f: + for line in f.readlines(): + if re.match(feature, line): + if isinstance(info, str): + line = feature + " = \"" + info.strip() + "\"\n" + else: + line = feature + " = \"" + info.decode('utf-8').strip( + ) + "\"\n" + new_str = new_str + line + + with open(file_name, "w") as f: + f.write(new_str) + + +if len(sys.argv) > 2: + update_info("paddle_serving_server_gpu/version.py", "cuda_version", + sys.argv[2]) + +path = "paddle_serving_" + sys.argv[1] +commit_id = subprocess.check_output(['git', 'rev-parse', 'HEAD']) +update_info(path + "/version.py", "commit_id", commit_id) diff --git a/python/paddle_serving_app/version.py b/python/paddle_serving_app/version.py index 332cba98dd692c4e33da68d4de7763e83e3729b5..554162f4f29a6c28e328c735a71512cd48e59962 100644 --- a/python/paddle_serving_app/version.py +++ b/python/paddle_serving_app/version.py @@ -13,3 +13,4 @@ # limitations under the License. """ Paddle Serving App version string """ serving_app_version = "0.1.2" +commit_id = "" diff --git a/python/paddle_serving_client/convert.py b/python/paddle_serving_client/convert.py new file mode 100644 index 0000000000000000000000000000000000000000..e3cd3a05f8e09155b0c884e3ddf12b57234de3dd --- /dev/null +++ b/python/paddle_serving_client/convert.py @@ -0,0 +1,66 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Usage: + Convert a paddle inference model into a model file that can be used for Paddle Serving. + Example: + python -m paddle_serving_client.convert --dirname ./inference_model +""" +import argparse +from .io import inference_model_to_serving + + +def parse_args(): # pylint: disable=doc-string-missing + parser = argparse.ArgumentParser("convert") + parser.add_argument( + "--dirname", + type=str, + required=True, + help='Path of saved model files. Program file and parameter files are saved in this directory.' + ) + parser.add_argument( + "--serving_server", + type=str, + default="serving_server", + help='The path of model files and configuration files for server. Default: "serving_server".' + ) + parser.add_argument( + "--serving_client", + type=str, + default="serving_client", + help='The path of configuration files for client. Default: "serving_client".' + ) + parser.add_argument( + "--model_filename", + type=str, + default=None, + help='The name of file to load the inference program. If it is None, the default filename __model__ will be used' + ) + parser.add_argument( + "--params_filename", + type=str, + default=None, + help='The name of file to load all parameters. It is only used for the case that all parameters were saved in a single binary file. If parameters were saved in separate files, set it as None. Default: None.' + ) + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + inference_model_to_serving( + args.dirname, + serving_server=args.serving_server, + serving_client=args.serving_client, + model_filename=args.model_filename, + params_filename=args.params_filename) diff --git a/python/paddle_serving_client/version.py b/python/paddle_serving_client/version.py index f7fc14b2a7f0c25b471e8d3bb44e9d6db6839d01..015a73dca73360da228877cf5b41188dd396933c 100644 --- a/python/paddle_serving_client/version.py +++ b/python/paddle_serving_client/version.py @@ -15,3 +15,4 @@ serving_client_version = "0.3.2" serving_server_version = "0.3.2" module_proto_version = "0.3.2" +commit_id = "" diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 675dcb138bd6c4b70ccb68f5d1ae8e721ef6704e..76694bc5bb864f4c21ff3b9c2cfd07761c5adbea 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -161,6 +161,10 @@ class Server(object): self.container_id = None self.model_config_paths = None # for multi-model in a workflow + def get_fetch_list(self): + fetch_names = [var.alias_name for var in self.model_conf.fetch_var] + return fetch_names + def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency diff --git a/python/paddle_serving_server/version.py b/python/paddle_serving_server/version.py index f7fc14b2a7f0c25b471e8d3bb44e9d6db6839d01..015a73dca73360da228877cf5b41188dd396933c 100644 --- a/python/paddle_serving_server/version.py +++ b/python/paddle_serving_server/version.py @@ -15,3 +15,4 @@ serving_client_version = "0.3.2" serving_server_version = "0.3.2" module_proto_version = "0.3.2" +commit_id = "" diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py old mode 100755 new mode 100644 index 51fa99f93d533f6f0f82ffd98e4bfa1e40e9ea92..dd8edaed59001b3b652376eb46d061194c74f833 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -21,12 +21,36 @@ from paddle_serving_client import Client from contextlib import closing import socket +from paddle_serving_server import pipeline +from paddle_serving_server.pipeline import Op + class WebService(object): def __init__(self, name="default_service"): self.name = name + # pipeline + self._server = pipeline.PipelineServer(self.name) + + def get_pipeline_response(self, read_op): + return None + + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) + + def run_service(self): + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def _launch_rpc_service(self): @@ -63,6 +87,7 @@ class WebService(object): device="cpu", mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -104,6 +129,7 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -153,6 +179,7 @@ class WebService(object): "{}".format(self.model_config), gpu=False, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, @@ -162,9 +189,11 @@ class WebService(object): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): + print("This API will be deprecated later. Please do not use it") for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index 7a5c26889f6e3bf33339dfd6034e282a2a5752b8..042027066df7a094dca784722345937608ac4099 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -209,6 +209,10 @@ class Server(object): self.product_name = None self.container_id = None + def get_fetch_list(self): + fetch_names = [var.alias_name for var in self.model_conf.fetch_var] + return fetch_names + def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency diff --git a/python/paddle_serving_server_gpu/version.py b/python/paddle_serving_server_gpu/version.py index 2272c3aa91f999697ea8ef3e2cdb585b01db8bed..3952f6e4058589e45de0618e5fc38e3d0aaf0c52 100644 --- a/python/paddle_serving_server_gpu/version.py +++ b/python/paddle_serving_server_gpu/version.py @@ -16,3 +16,4 @@ serving_client_version = "0.3.2" serving_server_version = "0.3.2" module_proto_version = "0.3.2" cuda_version = "9" +commit_id = "" diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index 5e9fdf4f4fda84dfb7c4f598fae6cf2381c377ca..4154e8242b7c26847c171bf9c71a7923687348fa 100644 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -24,17 +24,43 @@ import sys import numpy as np import paddle_serving_server_gpu as serving +from paddle_serving_server_gpu import pipeline +from paddle_serving_server_gpu.pipeline import Op + class WebService(object): def __init__(self, name="default_service"): self.name = name - self.gpus = [] - self.rpc_service_list = [] + # pipeline + self._server = pipeline.PipelineServer(self.name) + + self.gpus = [] # deprecated + self.rpc_service_list = [] # deprecated + + def get_pipeline_response(self, read_op): + return None + + def prepare_pipeline_config(self, yaml_file): + # build dag + read_op = pipeline.RequestOp() + last_op = self.get_pipeline_response(read_op) + if not isinstance(last_op, Op): + raise ValueError("The return value type of `get_pipeline_response` " + "function is not Op type, please check function " + "`get_pipeline_response`.") + response_op = pipeline.ResponseOp(input_ops=[last_op]) + self._server.set_response_op(response_op) + self._server.prepare_server(yaml_file) + + def run_service(self): + self._server.run_server() def load_model_config(self, model_config): + print("This API will be deprecated later. Please do not use it") self.model_config = model_config def set_gpus(self, gpus): + print("This API will be deprecated later. Please do not use it") self.gpus = [int(x) for x in gpus.split(",")] def default_rpc_service(self, @@ -88,6 +114,7 @@ class WebService(object): gpuid=0, mem_optim=True, ir_optim=False): + print("This API will be deprecated later. Please do not use it") self.workdir = workdir self.port = port self.device = device @@ -155,6 +182,7 @@ class WebService(object): return result def run_rpc_service(self): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -183,6 +211,7 @@ class WebService(object): # TODO: maybe change another API name: maybe run_local_predictor? def run_debugger_service(self, gpu=False): + print("This API will be deprecated later. Please do not use it") import socket localIP = socket.gethostbyname(socket.gethostname()) print("web service address:") @@ -209,18 +238,21 @@ class WebService(object): "{}".format(self.model_config), gpu=gpu, profile=False) def run_web_service(self): + print("This API will be deprecated later. Please do not use it") self.app_instance.run(host="0.0.0.0", port=self.port, threaded=False, - processes=1) + processes=4) def get_app_instance(self): return self.app_instance def preprocess(self, feed=[], fetch=[]): + print("This API will be deprecated later. Please do not use it") return feed, fetch def postprocess(self, feed=[], fetch=[], fetch_map=None): - for key in fetch_map.iterkeys(): + print("This API will be deprecated later. Please do not use it") + for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() return fetch_map diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index 9f3056708c4394637ea6898fa50911af9871cd9d..7718016c9989a3b7348c3389c86495537786abb8 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logger # this module must be the first to import -from operator import Op, RequestOp, ResponseOp -from pipeline_server import PipelineServer -from pipeline_client import PipelineClient -from analyse import Analyst +from . import logger # this module must be the first to import +from .operator import Op, RequestOp, ResponseOp +from .pipeline_server import PipelineServer +from .pipeline_client import PipelineClient +from .local_rpc_service_handler import LocalRpcServiceHandler +from .analyse import Analyst diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 6f86658b262af79016c80172cac45d0dba15fe81..51aa0d4b4c33947d85a18f613f897129f85061fd 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -40,7 +40,8 @@ class ChannelDataEcode(enum.Enum): RPC_PACKAGE_ERROR = 4 CLIENT_ERROR = 5 CLOSED_ERROR = 6 - UNKNOW = 7 + NO_SERVICE = 7 + UNKNOW = 8 class ChannelDataType(enum.Enum): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 0e058dbeab8be4741268cadad0ab887f03a7d8a7..272071f3211ed6029e5ba757da5ee2c780681ac2 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -299,13 +299,12 @@ class DAGExecutor(object): sys.stderr.write(profile_str) # add profile info into rpc_resp - profile_value = "" if resp_channeldata.client_need_profile: profile_set = resp_channeldata.profile_data_set profile_set.add(profile_str) profile_value = "".join(list(profile_set)) - rpc_resp.key.append(self._client_profile_key) - rpc_resp.value.append(profile_value) + rpc_resp.key.append(self._client_profile_key) + rpc_resp.value.append(profile_value) return rpc_resp @@ -338,7 +337,8 @@ class DAG(object): self._manager = PipelineProcSyncManager() _LOGGER.info("[DAG] Succ init") - def get_use_ops(self, response_op): + @staticmethod + def get_use_ops(response_op): unique_names = set() used_ops = set() succ_ops_of_use_op = {} # {op_name: succ_ops} @@ -427,11 +427,11 @@ class DAG(object): _LOGGER.critical("Failed to build DAG: ResponseOp" " has not been set.") os._exit(-1) - used_ops, out_degree_ops = self.get_use_ops(response_op) + used_ops, out_degree_ops = DAG.get_use_ops(response_op) if not self._build_dag_each_worker: _LOGGER.info("================= USED OP =================") for op in used_ops: - if op.name != self._request_name: + if not isinstance(op, RequestOp): _LOGGER.info(op.name) _LOGGER.info("-------------------------------------------") if len(used_ops) <= 1: diff --git a/python/paddle_serving_server_gpu/gen_cuda_version.py b/python/pipeline/gateway/__init__.py similarity index 63% rename from python/paddle_serving_server_gpu/gen_cuda_version.py rename to python/pipeline/gateway/__init__.py index 4a320a0e4dd9f9145a2c7682d5eecb7f582862b5..abf198b97e6e818e1fbe59006f98492640bcee54 100644 --- a/python/paddle_serving_server_gpu/gen_cuda_version.py +++ b/python/pipeline/gateway/__init__.py @@ -11,17 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import sys -import re -import os - -new_str = "" -with open("paddle_serving_server_gpu/version.py", "r") as f: - for line in f.readlines(): - if re.match("cuda_version", line): - line = re.sub(r"\d+", sys.argv[1], line) - new_str = new_str + line - -with open("paddle_serving_server_gpu/version.py", "w") as f: - f.write(new_str) diff --git a/python/pipeline/gateway/proto/gateway.proto b/python/pipeline/gateway/proto/gateway.proto new file mode 100644 index 0000000000000000000000000000000000000000..9d3d501d06acf731231504a0ba97e89c72519ae4 --- /dev/null +++ b/python/pipeline/gateway/proto/gateway.proto @@ -0,0 +1,41 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package baidu.paddle_serving.pipeline_serving; +option go_package = ".;pipeline_serving"; + +import "google/api/annotations.proto"; + +message Response { + repeated string key = 1; + repeated string value = 2; + int32 ecode = 3; + string error_info = 4; +}; + +message Request { + repeated string key = 1; + repeated string value = 2; + string name = 3; +} + +service PipelineService { + rpc inference(Request) returns (Response) { + option (google.api.http) = { + post : "/{name=*}/prediction" + body : "*" + }; + } +}; diff --git a/python/pipeline/gateway/proxy_server.go b/python/pipeline/gateway/proxy_server.go new file mode 100644 index 0000000000000000000000000000000000000000..a74e798463b58efe26ab027c649a07131d4bbf32 --- /dev/null +++ b/python/pipeline/gateway/proxy_server.go @@ -0,0 +1,52 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "C" + "flag" + "net/http" + "log" + "strconv" + + "golang.org/x/net/context" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "google.golang.org/grpc" + + gw "./proto" +) + +//export run_proxy_server +func run_proxy_server(grpc_port int, http_port int) error { + var ( + pipelineEndpoint = flag.String("pipeline_endpoint", "localhost:" + strconv.Itoa(grpc_port), "endpoint of PipelineService") + ) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mux := runtime.NewServeMux() + opts := []grpc.DialOption{grpc.WithInsecure()} + err := gw.RegisterPipelineServiceHandlerFromEndpoint(ctx, mux, *pipelineEndpoint, opts) + if err != nil { + return err + } + + log.Println("start proxy service") + return http.ListenAndServe(":" + strconv.Itoa(http_port), mux) // proxy port +} + +func main() {} diff --git a/python/pipeline/local_rpc_service_handler.py b/python/pipeline/local_rpc_service_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..376fcaf13af4e5a51ccf3ee6a1bd06a474a33bbd --- /dev/null +++ b/python/pipeline/local_rpc_service_handler.py @@ -0,0 +1,134 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import multiprocessing +try: + from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "GPU" +except ImportError: + from paddle_serving_server import OpMaker, OpSeqMaker, Server + PACKAGE_VERSION = "CPU" +from . import util + +_LOGGER = logging.getLogger(__name__) +_workdir_name_gen = util.NameGenerator("workdir_") + + +class LocalRpcServiceHandler(object): + def __init__(self, + model_config, + workdir="", + thread_num=2, + devices="", + mem_optim=True, + ir_optim=False, + available_port_generator=None): + if available_port_generator is None: + available_port_generator = util.GetAvailablePortGenerator() + + self._model_config = model_config + self._port_list = [] + if devices == "": + # cpu + devices = [-1] + self._port_list.append(available_port_generator.next()) + _LOGGER.info("Model({}) will be launch in cpu device. Port({})" + .format(model_config, self._port_list)) + else: + # gpu + if PACKAGE_VERSION == "CPU": + raise ValueError( + "You are using the CPU version package(" + "paddle-serving-server), unable to set devices") + devices = [int(x) for x in devices.split(",")] + for _ in devices: + self._port_list.append(available_port_generator.next()) + _LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})" + .format(model_config, devices, self._port_list)) + self._workdir = workdir + self._devices = devices + self._thread_num = thread_num + self._mem_optim = mem_optim + self._ir_optim = ir_optim + + self._rpc_service_list = [] + self._server_pros = [] + self._fetch_vars = None + + def get_fetch_list(self): + return self._fetch_vars + + def get_port_list(self): + return self._port_list + + def get_client_config(self): + return os.path.join(self._model_config, "serving_server_conf.prototxt") + + def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim, + ir_optim): + device = "gpu" + if gpuid == -1: + device = "cpu" + op_maker = OpMaker() + read_op = op_maker.create('general_reader') + general_infer_op = op_maker.create('general_infer') + general_response_op = op_maker.create('general_response') + + op_seq_maker = OpSeqMaker() + op_seq_maker.add_op(read_op) + op_seq_maker.add_op(general_infer_op) + op_seq_maker.add_op(general_response_op) + + server = Server() + server.set_op_sequence(op_seq_maker.get_op_sequence()) + server.set_num_threads(thread_num) + server.set_memory_optimize(mem_optim) + server.set_ir_optimize(ir_optim) + + server.load_model_config(self._model_config) + if gpuid >= 0: + server.set_gpuid(gpuid) + server.prepare_server(workdir=workdir, port=port, device=device) + if self._fetch_vars is None: + self._fetch_vars = server.get_fetch_list() + return server + + def _start_one_server(self, service_idx): + self._rpc_service_list[service_idx].run_server() + + def prepare_server(self): + for i, device_id in enumerate(self._devices): + if self._workdir != "": + workdir = "{}_{}".format(self._workdir, i) + else: + workdir = _workdir_name_gen.next() + self._rpc_service_list.append( + self._prepare_one_server( + workdir, + self._port_list[i], + device_id, + thread_num=self._thread_num, + mem_optim=self._mem_optim, + ir_optim=self._ir_optim)) + + def start_server(self): + for i, service in enumerate(self._rpc_service_list): + p = multiprocessing.Process( + target=self._start_one_server, args=(i, )) + p.daemon = True + self._server_pros.append(p) + for p in self._server_pros: + p.start() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index b18b5ed8c43312481384913109be2830ad1eeb0f..3b928b9cbab28904e6225d88e229e9a0d2da4f56 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -38,6 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ChannelTimeoutError) from .util import NameGenerator from .profiler import UnsafeTimeProfiler as TimeProfiler +from . import local_rpc_service_handler _LOGGER = logging.getLogger(__name__) _op_name_gen = NameGenerator("Op") @@ -47,46 +48,128 @@ class Op(object): def __init__(self, name=None, input_ops=[], - server_endpoints=[], - fetch_list=[], + server_endpoints=None, + fetch_list=None, client_config=None, - concurrency=1, - timeout=-1, - retry=1, - batch_size=1, - auto_batching_timeout=None): + concurrency=None, + timeout=None, + retry=None, + batch_size=None, + auto_batching_timeout=None, + local_rpc_service_handler=None): + # In __init__, all the parameters are just saved and Op is not initialized if name is None: name = _op_name_gen.next() self.name = name # to identify the type of OP, it must be globally unique self.concurrency = concurrency # amount of concurrency self.set_input_ops(input_ops) + self._local_rpc_service_handler = local_rpc_service_handler self._server_endpoints = server_endpoints - self.with_serving = False - if len(self._server_endpoints) != 0: - self.with_serving = True - self._client_config = client_config self._fetch_names = fetch_list - - if timeout > 0: - self._timeout = timeout / 1000.0 - else: - self._timeout = -1 + self._client_config = client_config + self._timeout = timeout self._retry = max(1, retry) + self._batch_size = batch_size + self._auto_batching_timeout = auto_batching_timeout + self._input = None self._outputs = [] - self._batch_size = batch_size - self._auto_batching_timeout = auto_batching_timeout - if self._auto_batching_timeout is not None: - if self._auto_batching_timeout <= 0 or self._batch_size == 1: - _LOGGER.warning( - self._log( - "Because auto_batching_timeout <= 0 or batch_size == 1," - " set auto_batching_timeout to None.")) - self._auto_batching_timeout = None + self._server_use_profile = False + self._tracer = None + + # only for thread op + self._for_init_op_lock = threading.Lock() + self._for_close_op_lock = threading.Lock() + self._succ_init_op = False + self._succ_close_op = False + + def init_from_dict(self, conf): + # init op + if self.concurrency is None: + self.concurrency = conf["concurrency"] + if self._retry is None: + self._retry = conf["retry"] + if self._fetch_names is None: + self._fetch_names = conf.get("fetch_list") + if self._client_config is None: + self._client_config = conf.get("client_config") + + if self._timeout is None: + self._timeout = conf["timeout"] + if self._timeout > 0: + self._timeout = self._timeout / 1000.0 + else: + self._timeout = -1 + + if self._batch_size is None: + self._batch_size = conf["batch_size"] + if self._auto_batching_timeout is None: + self._auto_batching_timeout = conf["auto_batching_timeout"] + if self._auto_batching_timeout <= 0 or self._batch_size == 1: + _LOGGER.warning( + self._log( + "Because auto_batching_timeout <= 0 or batch_size == 1," + " set auto_batching_timeout to None.")) + self._auto_batching_timeout = None + else: + self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 + + if self._server_endpoints is None: + server_endpoints = conf.get("server_endpoints", []) + if len(server_endpoints) != 0: + # remote service + self.with_serving = True + self._server_endpoints = server_endpoints else: - self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 + if self._local_rpc_service_handler is None: + local_service_conf = conf.get("local_service_conf") + _LOGGER.info("local_service_conf: {}".format( + local_service_conf)) + model_config = local_service_conf.get("model_config") + _LOGGER.info("model_config: {}".format(model_config)) + if model_config is None: + self.with_serving = False + else: + # local rpc service + self.with_serving = True + service_handler = local_rpc_service_handler.LocalRpcServiceHandler( + model_config=model_config, + workdir=local_service_conf["workdir"], + thread_num=local_service_conf["thread_num"], + devices=local_service_conf["devices"], + mem_optim=local_service_conf["mem_optim"], + ir_optim=local_service_conf["ir_optim"]) + service_handler.prepare_server() # get fetch_list + serivce_ports = service_handler.get_port_list() + self._server_endpoints = [ + "127.0.0.1:{}".format(p) for p in serivce_ports + ] + if self._client_config is None: + self._client_config = service_handler.get_client_config( + ) + if self._fetch_names is None: + self._fetch_names = service_handler.get_fetch_list() + self._local_rpc_service_handler = service_handler + else: + self.with_serving = True + self._local_rpc_service_handler.prepare_server( + ) # get fetch_list + serivce_ports = self._local_rpc_service_handler.get_port_list( + ) + self._server_endpoints = [ + "127.0.0.1:{}".format(p) for p in serivce_ports + ] + if self._client_config is None: + self._client_config = self._local_rpc_service_handler.get_client_config( + ) + if self._fetch_names is None: + self._fetch_names = self._local_rpc_service_handler.get_fetch_list( + ) + else: + self.with_serving = True + if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp): _LOGGER.info( self._log("\n\tinput_ops: {}," @@ -98,20 +181,22 @@ class Op(object): "\n\tretry: {}," "\n\tbatch_size: {}," "\n\tauto_batching_timeout(s): {}".format( - ", ".join([op.name for op in input_ops + ", ".join([op.name for op in self._input_ops ]), self._server_endpoints, self._fetch_names, self._client_config, self.concurrency, self._timeout, self._retry, self._batch_size, self._auto_batching_timeout))) - self._server_use_profile = False - self._tracer = None - - # only for thread op - self._for_init_op_lock = threading.Lock() - self._for_close_op_lock = threading.Lock() - self._succ_init_op = False - self._succ_close_op = False + def launch_local_rpc_service(self): + if self._local_rpc_service_handler is None: + _LOGGER.warning( + self._log("Failed to launch local rpc" + " service: local_rpc_service_handler is None.")) + return + port = self._local_rpc_service_handler.get_port_list() + self._local_rpc_service_handler.start_server() + _LOGGER.info("Op({}) use local rpc service at port: {}" + .format(self.name, port)) def use_default_auto_batching_config(self): if self._batch_size != 1: @@ -775,7 +860,9 @@ class RequestOp(Op): for idx, key in enumerate(request.key): data = request.value[idx] try: - data = eval(data) + evaled_data = eval(data) + if isinstance(evaled_data, np.ndarray): + data = evaled_data except Exception as e: pass dictdata[key] = data diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index ad78c6d39002b206082de8eab238be1abf543fee..48368dd81459de98f21af4048a2b694a54e80b75 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -42,11 +42,12 @@ class PipelineClient(object): def _pack_request_package(self, feed_dict, profile): req = pipeline_service_pb2.Request() + np.set_printoptions(threshold=sys.maxsize) for key, value in feed_dict.items(): req.key.append(key) if isinstance(value, np.ndarray): req.value.append(value.__repr__()) - elif isinstance(value, str): + elif isinstance(value, (str, unicode)): req.value.append(value) elif isinstance(value, list): req.value.append(np.array(value).__repr__()) @@ -75,7 +76,9 @@ class PipelineClient(object): continue data = resp.value[idx] try: - data = eval(data) + evaled_data = eval(data) + if isinstance(evaled_data, np.ndarray): + data = evaled_data except Exception as e: pass fetch_map[key] = data diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index e8229e810308b10d35f903a8415d898177bc2239..a6d4f9ed66fd8f563cb1526c136cba11b06fd6b3 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -22,22 +22,31 @@ from contextlib import closing import multiprocessing import yaml -from .proto import pipeline_service_pb2_grpc -from .operator import ResponseOp -from .dag import DAGExecutor +from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2 +from . import operator +from . import dag +from . import util +from . import channel _LOGGER = logging.getLogger(__name__) class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): - def __init__(self, response_op, dag_conf, worker_idx=-1): + def __init__(self, name, response_op, dag_conf, worker_idx=-1): super(PipelineServicer, self).__init__() + self._name = name + # init dag executor - self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) + self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor.start() _LOGGER.info("[PipelineServicer] succ init") def inference(self, request, context): + if request.name != "" and request.name != self._name: + resp = pipeline_service_pb2.Response() + resp.ecode = channel.ChannelDataEcode.NO_SERVICE.value + resp.error_info = "Failed to inference: Service name error." + return resp resp = self._dag_executor.call(request) return resp @@ -57,35 +66,83 @@ def _reserve_port(port): class PipelineServer(object): - def __init__(self): - self._port = None + def __init__(self, name=None): + self._name = name # for grpc-gateway path + self._rpc_port = None self._worker_num = None self._response_op = None + self._proxy_server = None + + def _grpc_gateway(self, grpc_port, http_port): + import os + from ctypes import cdll + from . import gateway + lib_path = os.path.join( + os.path.dirname(gateway.__file__), "libproxy_server.so") + proxy_server = cdll.LoadLibrary(lib_path) + proxy_server.run_proxy_server(grpc_port, http_port) + + def _run_grpc_gateway(self, grpc_port, http_port): + if http_port <= 0: + _LOGGER.info("Ignore grpc_gateway configuration.") + return + if not util.AvailablePortGenerator.port_is_available(http_port): + raise SystemExit("Failed to run grpc-gateway: prot {} " + "is already used".format(http_port)) + if self._proxy_server is not None: + raise RuntimeError("Proxy server has been started.") + self._proxy_server = multiprocessing.Process( + target=self._grpc_gateway, args=( + grpc_port, + http_port, )) + self._proxy_server.daemon = True + self._proxy_server.start() def set_response_op(self, response_op): - if not isinstance(response_op, ResponseOp): + if not isinstance(response_op, operator.ResponseOp): raise Exception("Failed to set response_op: response_op " "must be ResponseOp type.") if len(response_op.get_input_ops()) != 1: raise Exception("Failed to set response_op: response_op " "can only have one previous op.") self._response_op = response_op + self._used_op, _ = dag.DAG.get_use_ops(self._response_op) + + def prepare_server(self, yml_file=None, yml_dict=None): + conf = ServerYamlConfChecker.load_server_yaml_conf( + yml_file=yml_file, yml_dict=yml_dict) + + self._rpc_port = conf.get("rpc_port") + self._http_port = conf.get("http_port") + if self._rpc_port is None: + if self._http_port is None: + raise SystemExit("Failed to prepare_server: rpc_port or " + "http_port can not be None.") + else: + # http mode: generate rpc_port + if not util.AvailablePortGenerator.port_is_available( + self._http_port): + raise SystemExit("Failed to prepare_server: http_port({}) " + "is already used".format(self._http_port)) + self._rpc_port = util.GetAvailablePortGenerator().next() + else: + if not util.AvailablePortGenerator.port_is_available( + self._rpc_port): + raise SystemExit("Failed to prepare_server: prot {} " + "is already used".format(self._rpc_port)) + if self._http_port is None: + # rpc mode + pass + else: + # http mode + if not util.AvailablePortGenerator.port_is_available( + self._http_port): + raise SystemExit("Failed to prepare_server: http_port({}) " + "is already used".format(self._http_port)) - def _port_is_available(self, port): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.settimeout(2) - result = sock.connect_ex(('0.0.0.0', port)) - return result != 0 - - def prepare_server(self, yml_file): - conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file) - - self._port = conf["port"] - if not self._port_is_available(self._port): - raise SystemExit("Failed to prepare_server: prot {} " - "is already used".format(self._port)) self._worker_num = conf["worker_num"] self._build_dag_each_worker = conf["build_dag_each_worker"] + self._init_ops(conf["op"]) _LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("\n{}".format( @@ -98,10 +155,40 @@ class PipelineServer(object): _LOGGER.info("-------------------------------------------") self._conf = conf + self._start_local_rpc_service() + + def _init_ops(self, op_conf): + default_conf = { + "concurrency": 1, + "timeout": -1, + "retry": 1, + "batch_size": 1, + "auto_batching_timeout": -1, + "local_service_conf": { + "workdir": "", + "thread_num": 2, + "devices": "", + "mem_optim": True, + "ir_optim": False, + }, + } + for op in self._used_op: + if not isinstance(op, operator.RequestOp) and not isinstance( + op, operator.ResponseOp): + conf = op_conf.get(op.name, default_conf) + op.init_from_dict(conf) + + def _start_local_rpc_service(self): + # only brpc now + if self._conf["dag"]["client_type"] != "brpc": + _LOGGER.warning("Local service version must be brpc type now.") + for op in self._used_op: + if not isinstance(op, operator.RequestOp): + op.launch_local_rpc_service() def run_server(self): if self._build_dag_each_worker: - with _reserve_port(self._port) as port: + with _reserve_port(self._rpc_port) as port: bind_address = 'localhost:{}'.format(port) workers = [] for i in range(self._worker_num): @@ -111,6 +198,9 @@ class PipelineServer(object): args=(bind_address, self._response_op, self._conf, i)) worker.start() workers.append(worker) + self._run_grpc_gateway( + grpc_port=self._rpc_port, + http_port=self._http_port) # start grpc_gateway for worker in workers: worker.join() else: @@ -120,9 +210,13 @@ class PipelineServer(object): ('grpc.max_receive_message_length', 256 * 1024 * 1024) ]) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(self._response_op, self._conf), server) - server.add_insecure_port('[::]:{}'.format(self._port)) + PipelineServicer(self._name, self._response_op, self._conf), + server) + server.add_insecure_port('[::]:{}'.format(self._rpc_port)) server.start() + self._run_grpc_gateway( + grpc_port=self._rpc_port, + http_port=self._http_port) # start grpc_gateway server.wait_for_termination() def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): @@ -133,7 +227,8 @@ class PipelineServer(object): futures.ThreadPoolExecutor( max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(response_op, dag_conf, worker_idx), server) + PipelineServicer(self._name, response_op, dag_conf, worker_idx), + server) server.add_insecure_port(bind_address) server.start() server.wait_for_termination() @@ -144,12 +239,25 @@ class ServerYamlConfChecker(object): pass @staticmethod - def load_server_yaml_conf(yml_file): - with open(yml_file) as f: - conf = yaml.load(f.read()) + def load_server_yaml_conf(yml_file=None, yml_dict=None): + if yml_file is not None and yml_dict is not None: + raise SystemExit("Failed to prepare_server: only one of yml_file" + " or yml_dict can be selected as the parameter.") + if yml_file is not None: + with open(yml_file) as f: + conf = yaml.load(f.read()) + elif yml_dict is not None: + conf = yml_dict + else: + raise SystemExit("Failed to prepare_server: yml_file or yml_dict" + " can not be None.") ServerYamlConfChecker.check_server_conf(conf) ServerYamlConfChecker.check_dag_conf(conf["dag"]) ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"]) + for op_name in conf["op"]: + ServerYamlConfChecker.check_op_conf(conf["op"][op_name]) + ServerYamlConfChecker.check_local_service_conf(conf["op"][op_name][ + "local_service_conf"]) return conf @staticmethod @@ -161,26 +269,80 @@ class ServerYamlConfChecker(object): @staticmethod def check_server_conf(conf): default_conf = { - "port": 9292, + # "rpc_port": 9292, "worker_num": 1, "build_dag_each_worker": False, + #"http_port": 0, "dag": {}, + "op": {}, } conf_type = { - "port": int, + "rpc_port": int, + "http_port": int, "worker_num": int, "build_dag_each_worker": bool, + "grpc_gateway_port": int, } conf_qualification = { - "port": [(">=", 1024), ("<=", 65535)], + "rpc_port": [(">=", 1024), ("<=", 65535)], + "http_port": [(">=", 1024), ("<=", 65535)], "worker_num": (">=", 1), } ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, conf_qualification) + @staticmethod + def check_local_service_conf(conf): + default_conf = { + "workdir": "", + "thread_num": 2, + "devices": "", + "mem_optim": True, + "ir_optim": False, + } + conf_type = { + "model_config": str, + "workdir": str, + "thread_num": int, + "devices": str, + "mem_optim": bool, + "ir_optim": bool, + } + conf_qualification = {"thread_num": (">=", 1), } + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + + @staticmethod + def check_op_conf(conf): + default_conf = { + "concurrency": 1, + "timeout": -1, + "retry": 1, + "batch_size": 1, + "auto_batching_timeout": -1, + "local_service_conf": {}, + } + conf_type = { + "server_endpoints": list, + "fetch_list": list, + "client_config": str, + "concurrency": int, + "timeout": int, + "retry": int, + "batch_size": int, + "auto_batching_timeout": int, + } + conf_qualification = { + "concurrency": (">=", 1), + "retry": (">=", 1), + "batch_size": (">=", 1), + } + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + @staticmethod def check_tracer_conf(conf): default_conf = {"interval_s": -1, } @@ -231,6 +393,8 @@ class ServerYamlConfChecker(object): @staticmethod def check_conf_type(conf, conf_type): for key, val in conf_type.items(): + if key not in conf: + continue if not isinstance(conf[key], val): raise SystemExit("[CONF] {} must be {} type, but get {}." .format(key, val, type(conf[key]))) @@ -238,6 +402,8 @@ class ServerYamlConfChecker(object): @staticmethod def check_conf_qualification(conf, conf_qualification): for key, qualification in conf_qualification.items(): + if key not in conf: + continue if not isinstance(qualification, list): qualification = [qualification] if not ServerYamlConfChecker.qualification_check(conf[key], diff --git a/python/pipeline/proto/pipeline_service.proto b/python/pipeline/proto/pipeline_service.proto index a920d5618ce36a191390d5140bee0a42c7394a6b..02c922027ea6c00a3831137b55604950378b84fe 100644 --- a/python/pipeline/proto/pipeline_service.proto +++ b/python/pipeline/proto/pipeline_service.proto @@ -18,6 +18,7 @@ package baidu.paddle_serving.pipeline_serving; message Request { repeated string key = 1; repeated string value = 2; + optional string name = 3; }; message Response { diff --git a/python/pipeline/util.py b/python/pipeline/util.py index fb5e14ce808fd34de75b1a640630ca172510cd6c..d7847f179de7557b5446958536008adc3c981f95 100644 --- a/python/pipeline/util.py +++ b/python/pipeline/util.py @@ -17,6 +17,8 @@ import logging import threading import multiprocessing import multiprocessing.managers +from contextlib import closing +import socket if sys.version_info.major == 2: import Queue from Queue import PriorityQueue @@ -29,6 +31,34 @@ else: _LOGGER = logging.getLogger(__name__) +class AvailablePortGenerator(object): + def __init__(self, start_port=12000): + self._curr_port = start_port + + @staticmethod + def port_is_available(port): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.settimeout(2) + result = sock.connect_ex(('0.0.0.0', port)) + if result != 0: + return True + else: + return False + + def next(self): + while not AvailablePortGenerator.port_is_available(self._curr_port): + self._curr_port += 1 + self._curr_port += 1 + return self._curr_port - 1 + + +_AvailablePortGenerator = AvailablePortGenerator() + + +def GetAvailablePortGenerator(): + return _AvailablePortGenerator + + class NameGenerator(object): # use unsafe-id-generator def __init__(self, prefix): diff --git a/python/setup.py.app.in b/python/setup.py.app.in index 523b9e5c493e881d1ac3d6553bdb57a91f084acb..1a06b0d352c1da4cdd09f74cb900853d4016afa8 100644 --- a/python/setup.py.app.in +++ b/python/setup.py.app.in @@ -16,7 +16,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform import os from setuptools import setup, Distribution, Extension @@ -24,18 +23,9 @@ from setuptools import find_packages from setuptools import setup from paddle_serving_app.version import serving_app_version from pkg_resources import DistributionNotFound, get_distribution +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False - -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() if '${PACK}' == 'ON': copy_lib() diff --git a/python/setup.py.client.in b/python/setup.py.client.in index 96773c38dc950c0b8357274dff30d7c952ecdc25..bcedc41599399762b4b838b6d89eb7adaef23800 100644 --- a/python/setup.py.client.in +++ b/python/setup.py.client.in @@ -16,7 +16,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform import os import sys @@ -24,20 +23,10 @@ from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_client.version import serving_client_version -from pkg_resources import DistributionNotFound, get_distribution +import util py_version = sys.version_info -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False - def copy_lib(): if py_version[0] == 2: lib_list = ['libpython2.7.so.1.0', 'libssl.so.10', 'libcrypto.so.10'] @@ -51,18 +40,20 @@ def copy_lib(): text = r.read() os.popen('cp {} ./paddle_serving_client/lib'.format(text.strip().split(' ')[1])) -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() + +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_client") if '${PACK}' == 'ON': copy_lib() - REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'numpy >= 1.12', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1' ] -if not find_package("paddlepaddle") and not find_package("paddlepaddle-gpu"): +if not util.find_package("paddlepaddle") and not util.find_package("paddlepaddle-gpu"): REQUIRED_PACKAGES.append("paddlepaddle") @@ -72,8 +63,10 @@ packages=['paddle_serving_client', 'paddle_serving_client.metric', 'paddle_serving_client.utils', 'paddle_serving_client.pipeline', - 'paddle_serving_client.pipeline.proto'] -package_data={'paddle_serving_client': ['serving_client.so','lib/*'],} + 'paddle_serving_client.pipeline.proto', + 'paddle_serving_client.pipeline.gateway', + 'paddle_serving_client.pipeline.gateway.proto'] +package_data={'paddle_serving_client': ['serving_client.so', 'lib/*', 'pipeline/gateway/libproxy_server.so'],} package_dir={'paddle_serving_client': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client', 'paddle_serving_client.proto': @@ -87,7 +80,11 @@ package_dir={'paddle_serving_client': 'paddle_serving_client.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline', 'paddle_serving_client.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/proto', + 'paddle_serving_client.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway', + 'paddle_serving_client.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/pipeline/gateway/proto'} setup( name='paddle-serving-client', diff --git a/python/setup.py.in b/python/setup.py.in index af7036bdd99e05966156064dd2bcf1bb8463b716..fa7051db94ebdd69778f7957f50b1301697398fe 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -16,17 +16,14 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving.version import serving_client_version +from grpc_tools import protoc +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] - -max_version, mid_version, min_version = python_version() +max_version, mid_version, min_version = util.python_version() REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle' diff --git a/python/setup.py.server.in b/python/setup.py.server.in index db679edbab8e6ba6929ed631c2bbc5a731146d0d..6733f1a4788818c530e3be0719686cea54cace49 100644 --- a/python/setup.py.server.in +++ b/python/setup.py.server.in @@ -16,25 +16,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_server.version import serving_server_version -from pkg_resources import DistributionNotFound, get_distribution - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] +max_version, mid_version, min_version = util.python_version() -max_version, mid_version, min_version = python_version() +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_server") REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', @@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ packages=['paddle_serving_server', 'paddle_serving_server.proto', 'paddle_serving_server.pipeline', - 'paddle_serving_server.pipeline.proto'] + 'paddle_serving_server.pipeline.proto', + 'paddle_serving_server.pipeline.gateway', + 'paddle_serving_server.pipeline.gateway.proto'] package_dir={'paddle_serving_server': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server', @@ -53,7 +46,13 @@ package_dir={'paddle_serving_server': 'paddle_serving_server.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline', 'paddle_serving_server.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/proto', + 'paddle_serving_server.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway', + 'paddle_serving_server.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server/pipeline/gateway/proto'} + +package_data={'paddle_serving_server': ['pipeline/gateway/libproxy_server.so'],} setup( name='paddle-serving-server', @@ -65,6 +64,7 @@ setup( author_email='guru4elephant@gmail.com', install_requires=REQUIRED_PACKAGES, packages=packages, + package_data=package_data, package_dir=package_dir, # PyPI package information. classifiers=[ diff --git a/python/setup.py.server_gpu.in b/python/setup.py.server_gpu.in index 4554c1d368f70a32d16ceeabb54d63625f9f256d..523615b8e782c29ebdedadc54a9473a0b672aac0 100644 --- a/python/setup.py.server_gpu.in +++ b/python/setup.py.server_gpu.in @@ -16,25 +16,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import platform - from setuptools import setup, Distribution, Extension from setuptools import find_packages from setuptools import setup from paddle_serving_server_gpu.version import serving_server_version -from pkg_resources import DistributionNotFound, get_distribution - -def find_package(pkgname): - try: - get_distribution(pkgname) - return True - except DistributionNotFound: - return False +import util -def python_version(): - return [int(v) for v in platform.python_version().split(".")] +max_version, mid_version, min_version = util.python_version() -max_version, mid_version, min_version = python_version() +# gen pipeline proto code +util.gen_pipeline_code("paddle_serving_server_gpu") REQUIRED_PACKAGES = [ 'six >= 1.10.0', 'protobuf >= 3.11.0', 'grpcio >= 1.28.1', 'grpcio-tools >= 1.28.1', @@ -44,7 +35,9 @@ REQUIRED_PACKAGES = [ packages=['paddle_serving_server_gpu', 'paddle_serving_server_gpu.proto', 'paddle_serving_server_gpu.pipeline', - 'paddle_serving_server_gpu.pipeline.proto'] + 'paddle_serving_server_gpu.pipeline.proto', + 'paddle_serving_server_gpu.pipeline.gateway', + 'paddle_serving_server_gpu.pipeline.gateway.proto'] package_dir={'paddle_serving_server_gpu': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu', @@ -53,7 +46,13 @@ package_dir={'paddle_serving_server_gpu': 'paddle_serving_server_gpu.pipeline': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline', 'paddle_serving_server_gpu.pipeline.proto': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto'} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/proto', + 'paddle_serving_server_gpu.pipeline.gateway': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway', + 'paddle_serving_server_gpu.pipeline.gateway.proto': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu/pipeline/gateway/proto'} + +package_data={'paddle_serving_server_gpu': ['pipeline/gateway/libproxy_server.so'],} setup( name='paddle-serving-server-gpu', @@ -65,6 +64,7 @@ setup( author_email='guru4elephant@gmail.com', install_requires=REQUIRED_PACKAGES, packages=packages, + package_data=package_data, package_dir=package_dir, # PyPI package information. classifiers=[ diff --git a/python/util.py b/python/util.py new file mode 100644 index 0000000000000000000000000000000000000000..0ae68c1ed53766cb7f4f623e3a5f4fb50f7eb095 --- /dev/null +++ b/python/util.py @@ -0,0 +1,70 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pkg_resources import DistributionNotFound, get_distribution +from grpc_tools import protoc +import os +import platform + + +def python_version(): + return [int(v) for v in platform.python_version().split(".")] + + +def find_package(pkgname): + try: + get_distribution(pkgname) + return True + except DistributionNotFound: + return False + + +def gen_pipeline_code(package_name): + # pipeline service proto + protoc.main(( + '', + '-I.', + '--python_out=.', + '--grpc_python_out=.', + '{}/pipeline/proto/pipeline_service.proto'.format(package_name), )) + + # pipeline grpc-gateway proto + # *.pb.go + ret = os.system( + "cd {}/pipeline/gateway/proto/ && " + "../../../../../third_party/install/protobuf/bin/protoc -I. " + "-I$GOPATH/src " + "-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis " + "--go_out=plugins=grpc:. " + "gateway.proto".format(package_name)) + if ret != 0: + exit(1) + # *.gw.go + ret = os.system( + "cd {}/pipeline/gateway/proto/ && " + "../../../../../third_party/install/protobuf/bin/protoc -I. " + "-I$GOPATH/src " + "-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis " + "--grpc-gateway_out=logtostderr=true:. " + "gateway.proto".format(package_name)) + if ret != 0: + exit(1) + + # pipeline grpc-gateway shared-lib + ret = os.system( + "cd {}/pipeline/gateway && " + "go build -buildmode=c-shared -o libproxy_server.so proxy_server.go". + format(package_name)) + if ret != 0: + exit(1) diff --git a/tools/serving_build.sh b/tools/serving_build.sh index c54631a733fecc532f22d3ce1793ff8554e21f7d..ee6e7cdb40ca86f1e4f4921fa4b257cb982337a5 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -19,6 +19,13 @@ function init() { cd Serving export SERVING_WORKDIR=$PWD $PYTHONROOT/bin/python -m pip install -r python/requirements.txt + export GOPATH=$HOME/go + export PATH=$PATH:$GOPATH/bin + + go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway + go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger + go get -u github.com/golang/protobuf/protoc-gen-go + go get -u google.golang.org/grpc } function check_cmd() { @@ -298,7 +305,6 @@ function python_test_bert() { cd bert # pwd: /Serving/python/examples/bert case $TYPE in CPU) - pip install paddlehub # Because download from paddlehub may timeout, # download the model from bos(max_seq_len=128). wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz @@ -306,14 +312,12 @@ function python_test_bert() { sh get_data.sh check_cmd "python -m paddle_serving_server.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 &" sleep 5 - pip install paddle_serving_app check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" kill_server_process echo "bert RPC inference pass" ;; GPU) export CUDA_VISIBLE_DEVICES=0 - pip install paddlehub # Because download from paddlehub may timeout, # download the model from bos(max_seq_len=128). wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz @@ -321,7 +325,6 @@ function python_test_bert() { sh get_data.sh check_cmd "python -m paddle_serving_server_gpu.serve --model bert_chinese_L-12_H-768_A-12_model --port 9292 --gpu_ids 0 &" sleep 5 - pip install paddle_serving_app check_cmd "head -n 10 data-c.txt | python bert_client.py --model bert_chinese_L-12_H-768_A-12_client/serving_client_conf.prototxt" kill_server_process echo "bert RPC inference pass" @@ -760,13 +763,14 @@ function python_test_resnet50(){ } function python_test_pipeline(){ - # pwd:/ Serving/python/examples + # pwd: /Serving/python/examples local TYPE=$1 export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving unsetproxy - cd pipeline/imdb_model_ensemble + cd pipeline # pwd: /Serving/python/examples/pipeline case $TYPE in CPU) + cd imdb_model_ensemble # pwd: /Serving/python/examples/pipeline/imdb_model_ensemble # start paddle serving service (brpc) sh get_data.sh python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --workdir test9292 &> cnn.log & @@ -775,7 +779,7 @@ function python_test_pipeline(){ # test: thread servicer & thread op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -792,7 +796,7 @@ EOF # test: thread servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -809,7 +813,7 @@ EOF # test: process servicer & process op cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -828,7 +832,7 @@ EOF pip uninstall grpcio -y pip install grpcio --no-binary=grpcio cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: true dag: @@ -852,7 +856,7 @@ EOF python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 --use_multilang --workdir test9393 &> bow.log & sleep 5 cat << EOF > config.yml -port: 18080 +rpc_port: 18080 worker_num: 4 build_dag_each_worker: false dag: @@ -869,16 +873,47 @@ EOF kill_server_process kill_process_by_port 9292 kill_process_by_port 9393 + cd .. + + cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service + sh get_data.sh + python web_service.py >/dev/null & + sleep 5 + curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' + check http code + http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction` + if [ ${http_code} -ne 200 ]; then + echo "HTTP status code -ne 200" + exit 1 + fi + ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill + kill_server_process + cd .. ;; GPU) - echo "pipeline ignore GPU test" + cd simple_web_service # pwd: /Serving/python/examples/pipeline/simple_web_service + sh get_data.sh + python web_service.py >/dev/null & + sleep 5 + curl -X POST -k http://localhost:18080/uci/prediction -d '{"key": ["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' + # check http code + http_code=`curl -X POST -k -d '{"key":["x"], "value": ["0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332"]}' -s -w "%{http_code}" -o /dev/null http://localhost:18080/uci/prediction` + if [ ${http_code} -ne 200 ]; then + echo "HTTP status code -ne 200" + exit 1 + fi + ps -ef | grep "web_service" | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "pipeline" | grep -v grep | awk '{print $2}' | xargs kill + kill_server_process + cd .. # pwd: /Serving/python/examples/pipeline ;; *) echo "error type" exit 1 ;; esac - cd ../../ + cd .. setproxy unset SERVING_BIN } @@ -928,118 +963,8 @@ function monitor_test() { mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test case $TYPE in CPU): - pip install pyftpdlib - mkdir remote_path - mkdir local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &" - cd .. # pwd: /Serving/_monitor_test - - # type: ftp - # remote_path: / - # remote_model_name: uci_housing.tar.gz - # local_tmp_path: ___tmp - # local_path: local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - touch donefile - cd .. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ - --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ - --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: ftp - # remote_path: /tmp_dir - # remote_model_name: uci_housing_model - # local_tmp_path: ___tmp - # local_path: local_path - mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - tar -xzf uci_housing.tar.gz - touch donefile - cd ../.. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ - --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: general - # remote_path: / - # remote_model_name: uci_housing.tar.gz - # local_tmp_path: ___tmp - # local_path: local_path - cd remote_path # pwd: /Serving/_monitor_test/remote_path - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - touch donefile - cd .. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='general' --general_host='ftp://127.0.0.1:8000' \ - --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ - --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - # type: general - # remote_path: /tmp_dir - # remote_model_name: uci_housing_model - # local_tmp_path: ___tmp - # local_path: local_path - mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir - wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz - tar -xzf uci_housing.tar.gz - touch donefile - cd ../.. # pwd: /Serving/_monitor_test - mkdir -p local_path/uci_housing_model - python -m paddle_serving_server.monitor \ - --type='general' --general_host='ftp://127.0.0.1:8000' \ - --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ - --remote_donefile_name='donefile' --local_path='local_path' \ - --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ - --local_tmp_path='___tmp' --interval='1' >/dev/null & - sleep 10 - if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then - echo "local_path/uci_housing_model/fluid_time_file not exist." - exit 1 - fi - ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill - rm -rf remote_path/* - rm -rf local_path/* - - ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill + # The CPU part and GPU part are identical. + # In order to avoid Travis CI timeout (50 min), the CPU version is not checked ;; GPU): pip install pyftpdlib