提交 6cca1d12 编写于 作者: B barrierye

update doc

上级 8af92bb4
...@@ -33,6 +33,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s ...@@ -33,6 +33,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s
- The default function of a single OP is to access a single Paddle Serving Service based on the input Channel data and put the result into the output Channel. - The default function of a single OP is to access a single Paddle Serving Service based on the input Channel data and put the result into the output Channel.
- OP supports user customization, including preprocess, process, postprocess functions that can be inherited and implemented by the user. - OP supports user customization, including preprocess, process, postprocess functions that can be inherited and implemented by the user.
- OP can set the number of concurrencies to increase the number of concurrencies processed. - OP can set the number of concurrencies to increase the number of concurrencies processed.
- OP can obtain data from multiple different RPC requests for Auto-Batching.
- OP can be started by a thread or process. - OP can be started by a thread or process.
### Channel Design ### Channel Design
...@@ -79,13 +80,15 @@ def __init__(name=None, ...@@ -79,13 +80,15 @@ def __init__(name=None,
client_config=None, client_config=None,
concurrency=1, concurrency=1,
timeout=-1, timeout=-1,
retry=1) retry=1,
batch_size=1,
auto_batching_timeout=None)
``` ```
The meaning of each parameter is as follows: The meaning of each parameter is as follows:
| Parameter | Meaning | | Parameter | Meaning |
| :--------------: | :----------------------------------------------------------: | | :-------------------: | :----------------------------------------------------------: |
| name | (str) String used to identify the OP type, which must be globally unique. | | name | (str) String used to identify the OP type, which must be globally unique. |
| input_ops | (list) A list of all previous OPs of the current Op. | | input_ops | (list) A list of all previous OPs of the current Op. |
| server_endpoints | (list) List of endpoints for remote Paddle Serving Service. If this parameter is not set, the OP will not access the remote Paddle Serving Service, that is, the process operation will not be performed. | | server_endpoints | (list) List of endpoints for remote Paddle Serving Service. If this parameter is not set, the OP will not access the remote Paddle Serving Service, that is, the process operation will not be performed. |
...@@ -94,14 +97,16 @@ The meaning of each parameter is as follows: ...@@ -94,14 +97,16 @@ The meaning of each parameter is as follows:
| concurrency | (int) The number of concurrent OPs. | | concurrency | (int) The number of concurrent OPs. |
| timeout | (int) The timeout time of the process operation, in seconds. If the value is less than zero, no timeout is considered. | | timeout | (int) The timeout time of the process operation, in seconds. If the value is less than zero, no timeout is considered. |
| retry | (int) Timeout number of retries. When the value is 1, no retries are made. | | retry | (int) Timeout number of retries. When the value is 1, no retries are made. |
| batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. |
| auto_batching_timeout | (float) Timeout for building batches of Auto-Batching. |
#### 2. General OP Secondary Development Interface #### 2. General OP Secondary Development Interface
| Interface or Variable | Explain | | Interface or Variable | Explain |
| :--------------------------------------------: | :----------------------------------------------------------: | | :--------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | Process the data obtained from the channel, and the processed data will be used as the input of the **process** function. | | def preprocess(self, input_dicts) | Process the data obtained from the channel, and the processed data will be used as the input of the **process** function. (This function handles a **sample**) |
| def process(self, feed_dict) | The RPC prediction process is based on the Paddle Serving Client, and the processed data will be used as the input of the **postprocess** function. | | def process(self, feed_dict_list) | The RPC prediction process is based on the Paddle Serving Client, and the processed data will be used as the input of the **postprocess** function. (This function handles a **batch**) |
| def postprocess(self, input_dicts, fetch_dict) | After processing the prediction results, the processed data will be put into the subsequent Channel to be obtained by the subsequent OP. | | def postprocess(self, input_dicts, fetch_dict) | After processing the prediction results, the processed data will be put into the subsequent Channel to be obtained by the subsequent OP. (This function handles a **sample**) |
| def init_op(self) | Used to load resources (such as word dictionary). | | def init_op(self) | Used to load resources (such as word dictionary). |
| self.concurrency_idx | Concurrency index of current process(not thread) (different kinds of OP are calculated separately). | | self.concurrency_idx | Concurrency index of current process(not thread) (different kinds of OP are calculated separately). |
...@@ -117,24 +122,24 @@ def preprocess(self, input_dicts): ...@@ -117,24 +122,24 @@ def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_dict): def process(self, feed_dict_list):
err, err_info = ChannelData.check_npdata(feed_dict) err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0: if err != 0:
raise NotImplementedError( raise NotImplementedError(
"{} Please override preprocess func.".format(err_info)) "{} Please override preprocess func.".format(err_info))
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_dict, fetch=self._fetch_names) feed=feed_dict_list, fetch=self._fetch_names)
return call_result return call_result
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
return fetch_dict return fetch_dict
``` ```
The parameter of **preprocess** is the data `input_dicts` in the previous Channel. This variable is a dictionary with the name of the previous OP as key and the output of the corresponding OP as value. The parameter of **preprocess** is the data `input_dicts` in the previous Channel. This variable (as a **sample**) is a dictionary with the name of the previous OP as key and the output of the corresponding OP as value.
The parameter of **process** is the input variable `fetch_dict` (the return value of the preprocess function) of the Paddle Serving Client prediction interface. This variable is a dictionary with feed_name as the key and the data in the ndarray format as the value. The parameter of **process** is the input variable `fetch_dict_list` (a list of the return value of the preprocess function) of the Paddle Serving Client prediction interface. This variable (as a **batch**) is a list of dictionaries with feed_name as the key and the data in the ndarray format as the value.
The parameters of **postprocess** are `input_dicts` and `fetch_dict`. `input_dicts` is consistent with the parameter of preprocess, and `fetch_dict` is the return value of the process function (if process is not executed, this value is the return value of preprocess). The parameters of **postprocess** are `input_dicts` and `fetch_dict`. `input_dicts` is consistent with the parameter of preprocess, and `fetch_dict` (as a **sample**) is a sample of the return batch of the process function (if process is not executed, this value is the return value of preprocess).
Users can also rewrite the **init_op** function to load some custom resources (such as word dictionary). The default implementation is as follows: Users can also rewrite the **init_op** function to load some custom resources (such as word dictionary). The default implementation is as follows:
...@@ -143,7 +148,7 @@ def init_op(self): ...@@ -143,7 +148,7 @@ def init_op(self):
pass pass
``` ```
It should be noted that in the threaded version of OP, each OP will only call this function once, so the loaded resources must be thread safe. It should be **noted** that in the threaded version of OP, each OP will only call this function once, so the loaded resources must be thread safe.
#### 3. RequestOp Definition #### 3. RequestOp Definition
...@@ -277,15 +282,16 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow. ...@@ -277,15 +282,16 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.
Run the following code Run the following code
```python ```python
import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
import logging
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
logging.basicConfig(level=logging.DEBUG)
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
...@@ -390,15 +396,26 @@ dag: ...@@ -390,15 +396,26 @@ dag:
use_profile: true use_profile: true
``` ```
After the function is enabled, the server will print the corresponding log information to the standard output in the process of prediction. In order to show the time consumption of each stage more intuitively, scripts are provided for further analysis and processing of log files. After the function is enabled, the server will print the corresponding log information to the standard output in the process of prediction. In order to show the time consumption of each stage more intuitively, Analyst module is provided for further analysis and processing of log files.
The output of the server is first saved to a file. Taking profile as an example, the script converts the time monitoring information in the log into JSON format and saves it to the trace file. The trace file can be visualized through the tracing function of Chrome browser. The output of the server is first saved to a file. Taking `profile.txt` as an example, the script converts the time monitoring information in the log into JSON format and saves it to the `trace` file. The `trace` file can be visualized through the tracing function of Chrome browser.
```shell ```shell
python timeline_trace.py profile trace import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Analyst
import json
import sys
if __name__ == "__main__":
log_filename = "profile.txt"
trace_filename = "trace"
analyst = Analyst(log_filename)
analyst.save_trace(trace_filename)
``` ```
Specific operation: open Chrome browser, input in the address bar `chrome://tracing/` , jump to the tracing page, click the load button, open the saved trace file, and then visualize the time information of each stage of the prediction service. Specific operation: open Chrome browser, input in the address bar `chrome://tracing/` , jump to the tracing page, click the load button, open the saved `trace` file, and then visualize the time information of each stage of the prediction service.
### Output profile information on client side ### Output profile information on client side
......
...@@ -30,9 +30,10 @@ Server端基于 gRPC 和图执行引擎构建,两者的关系如下图所示 ...@@ -30,9 +30,10 @@ Server端基于 gRPC 和图执行引擎构建,两者的关系如下图所示
### OP的设计 ### OP的设计
- 单个OP默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel - 单个 OP 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel
- 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现 - 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现
- 单个 OP 可以控制并发数,从而增加处理并发数 - 单个 OP 可以控制并发数,从而增加处理并发数
- 单个 OP 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching
- OP 可以由线程或进程启动 - OP 可以由线程或进程启动
### Channel的设计 ### Channel的设计
...@@ -79,13 +80,15 @@ def __init__(name=None, ...@@ -79,13 +80,15 @@ def __init__(name=None,
client_config=None, client_config=None,
concurrency=1, concurrency=1,
timeout=-1, timeout=-1,
retry=1) retry=1,
batch_size=1,
auto_batching_timeout=None)
``` ```
各参数含义如下 各参数含义如下
| 参数名 | 含义 | | 参数名 | 含义 |
| :--------------: | :----------------------------------------------------------: | | :-------------------: | :----------------------------------------------------------: |
| name | (str)用于标识 OP 类型的字符串,该字段必须全局唯一。 | | name | (str)用于标识 OP 类型的字符串,该字段必须全局唯一。 |
| input_ops | (list)当前 OP 的所有前继 OP 的列表。 | | input_ops | (list)当前 OP 的所有前继 OP 的列表。 |
| server_endpoints | (list)远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,则不访问远程 Paddle Serving Service,即 不会执行 process 操作。 | | server_endpoints | (list)远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,则不访问远程 Paddle Serving Service,即 不会执行 process 操作。 |
...@@ -94,14 +97,16 @@ def __init__(name=None, ...@@ -94,14 +97,16 @@ def __init__(name=None,
| concurrency | (int)OP 的并发数。 | | concurrency | (int)OP 的并发数。 |
| timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 | | timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 |
| retry | (int)超时重试次数。当该值为 1 时,不进行重试。 | | retry | (int)超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 |
| auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间。 |
#### 2. 普通 OP二次开发接口 #### 2. 普通 OP二次开发接口
| 变量或接口 | 说明 | | 变量或接口 | 说明 |
| :--------------------------------------------: | :----------------------------------------------------------: | | :--------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。 | | def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) |
| def process(self, feed_dict) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。 | | def process(self, feed_dict_list) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) |
| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。 | | def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) |
| def init_op(self) | 用于加载资源(如字典等)。 | | def init_op(self) | 用于加载资源(如字典等)。 |
| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 | | self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 |
...@@ -117,25 +122,24 @@ def preprocess(self, input_dicts): ...@@ -117,25 +122,24 @@ def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_dict): def process(self, feed_dict_list):
err, err_info = ChannelData.check_npdata(feed_dict) err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0: if err != 0:
raise NotImplementedError( raise NotImplementedError(
"{} Please override preprocess func.".format(err_info)) "{} Please override preprocess func.".format(err_info))
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_dict, fetch=self._fetch_names) feed=feed_dict_list, fetch=self._fetch_names)
return call_result return call_result
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
return fetch_dict return fetch_dict
``` ```
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。 **process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。
**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict`(preprocess 函数的返回值),该变量是一个以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。
**postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` 是 process 函数的返回值(如果没有执行 process ,则该值为 preprocess 的返回值)。 **postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。
用户还可以对 **init_op** 函数进行重写,已加载自定义的一些资源(比如字典等),默认实现如下: 用户还可以对 **init_op** 函数进行重写,已加载自定义的一些资源(比如字典等),默认实现如下:
...@@ -144,7 +148,7 @@ def init_op(self): ...@@ -144,7 +148,7 @@ def init_op(self):
pass pass
``` ```
需要注意的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的。 需要**注意**的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的。
#### 3. RequestOp 定义 #### 3. RequestOp 定义
...@@ -278,16 +282,16 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow. ...@@ -278,16 +282,16 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.
运行下面代码 运行下面代码
```python ```python
import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
import logging
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
logging.basicConfig(level=logging.DEBUG)
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
...@@ -391,15 +395,26 @@ dag: ...@@ -391,15 +395,26 @@ dag:
use_profile: true use_profile: true
``` ```
开启该功能后,Server 端在预测的过程中会将对应的日志信息打印到标准输出,为了更直观地展现各阶段的耗时,提供脚本对日志文件做进一步的分析处理。 开启该功能后,Server 端在预测的过程中会将对应的日志信息打印到标准输出,为了更直观地展现各阶段的耗时,提供 Analyst 模块对日志文件做进一步的分析处理。
使用时先将 Server 的输出保存到文件,以 profile 为例,脚本将日志中的时间打点信息转换成 json 格式保存到trace 文件,trace 文件可以通过 chrome 浏览器的 tracing 功能进行可视化。 使用时先将 Server 的输出保存到文件,以 `profile.txt` 为例,脚本将日志中的时间打点信息转换成 json 格式保存到 `trace` 文件,`trace` 文件可以通过 chrome 浏览器的 tracing 功能进行可视化。
```shell ```python
python timeline_trace.py profile trace import logging
logging.basicConfig(level=logging.INFO)
from paddle_serving_server.pipeline import Analyst
import json
import sys
if __name__ == "__main__":
log_filename = "profile.txt"
trace_filename = "trace"
analyst = Analyst(log_filename)
analyst.save_trace(trace_filename)
``` ```
具体操作:打开 chrome 浏览器,在地址栏输入 chrome://tracing/ ,跳转至 tracing 页面,点击 load 按钮,打开保存的 trace 文件,即可将预测服务的各阶段时间信息可视化。 具体操作:打开 chrome 浏览器,在地址栏输入 `chrome://tracing/` ,跳转至 tracing 页面,点击 load 按钮,打开保存的 `trace` 文件,即可将预测服务的各阶段时间信息可视化。
### 在 Client 端输出 Profile 信息 ### 在 Client 端输出 Profile 信息
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册