diff --git a/doc/Offical_Docs/3-0_QuickStart_Int_CN.md b/doc/Offical_Docs/3-0_QuickStart_Int_CN.md index b8991c009b38549f6db1cc0444bc7af667fbe9ba..9562c363e29bafdeba2dccf26b400f09bb123331 100644 --- a/doc/Offical_Docs/3-0_QuickStart_Int_CN.md +++ b/doc/Offical_Docs/3-0_QuickStart_Int_CN.md @@ -1,4 +1,4 @@ -# 部署案例概述 +# 快速开始案例部署 您可以通过以下 Paddle Serving 快速开始案例,分别了解到 C++ Serving 与 Python Pipeline 2种框架的部署方法。 - [使用 C++ Serving 部署 Resnet50 模型案例]() diff --git a/doc/Offical_Docs/7-0_Python_Pipeline_Int_CN.md b/doc/Offical_Docs/7-0_Python_Pipeline_Int_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..a65506849e8bfdf07f32ed198dc9c8079a263fdb --- /dev/null +++ b/doc/Offical_Docs/7-0_Python_Pipeline_Int_CN.md @@ -0,0 +1,11 @@ +# Python Pipeline 框架 + +在许多深度学习框架中,模型服务化部署通常用于单模型的一键部署。但在 AI 工业大生产的背景下,端到端的单一深度学习模型不能解决复杂问题,多个深度学习模型组合使用是解决现实复杂问题的常规手段,如文字识别 OCR 服务至少需要检测和识别2种模型;视频理解服务一般需要视频抽帧、切词、音频处理、分类等多种模型组合实现。当前,通用多模型组合服务的设计和实现是非常复杂的,既要能实现复杂的模型拓扑关系,又要保证服务的高并发、高可用和易于开发和维护等。 + +Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。 + +通过阅读以下内容掌握 Python Pipeline 框架基础功能、设计方案、使用指南等。 +- [Python Pipeline 基础功能]() +- [Python Pipeline 使用案例]() +- [Python Pipeline 高阶用法]() +- [Python Pipeline 优化指南]() diff --git a/doc/Offical_Docs/7-1_Python_Pipeline_Basic_CN.md b/doc/Offical_Docs/7-1_Python_Pipeline_Basic_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..d3f1a30e9743b7ab0bc1ff9a99dc4270c797500d --- /dev/null +++ b/doc/Offical_Docs/7-1_Python_Pipeline_Basic_CN.md @@ -0,0 +1,374 @@ +# Python Pipeline 基础功能 + +设计一个通用端到端多模型组合框架所面临的挑战有如下4点: + +1. 通用性:框架既要满足通用模型的输入类型,又要满足模型组合的复杂拓扑关系。 +2. 高性能:与常见互联网后端服务不同,深度学习模型的推理程序属于计算密集型程序,同时 GPU 等计算芯片价格昂贵,因此在平均响应时间不苛刻的场景下,计算资源占用和吞吐量指标格外重要。 +3. 高可用性:高可用的架构依赖每个服务的健壮性,服务状态可查询、异常可监控和管理是必备条件。 +4. 易于开发与调试:使用 Python 语言开发可大幅提升研发效率,运行的错误信息准确帮助开发者快速定位问题。 + +## 总体设计 +Python Pipeline 框架分为网络服务层和图执行引擎2部分,网络服务层处理多种网络协议请求和通用输入参数问题,图执行引擎层解决复杂拓扑关系。如下图所示 + +
+ +
+ +**一.网络服务层** + +网络服务层包括了 gRPC-gateway 和 gRPC Server。gPRC gateway 接收 HTTP 请求,打包成 proto 格式后转发给 gRPC Server,一套处理程序可同时处理 HTTP、gRPC 2种类型请求。 + +另外,在支持多种模型的输入输出数据类型上,使用统一的 service.proto 结构,具有更好的通用性。 + +```proto +message Request { + repeated string key = 1; + repeated string value = 2; + optional string name = 3; + optional string method = 4; + optional int64 logid = 5; + optional string clientip = 6; +}; + +message Response { + optional int32 err_no = 1; + optional string err_msg = 2; + repeated string key = 3; + repeated string value = 4; +}; +``` +Request 是输入结构,`key` 与 `value` 是配对的 string 数组。 `name` 与 `method` 对应 URL://{ip}:{port}/{name}/{method}。`logid` 和 `clientip` 便于用户串联服务级请求和自定义策略。 + +Response 是输出结构,`err_no` 和 `err_msg` 表达处理结果的正确性和错误信息,`key` 和 `value` 为结果。 + + +**二.图执行引擎层** + +图执行引擎的设计思路是基于有向无环图实现多模型组合的复杂拓扑关系,有向无环图由单节点或多节点串联、并联结构构成。 + +
+ +
+ +图执行引擎抽象归纳出2种数据结构 OP 节点和 Channel 有向边,构建一条异步流水线工作流。核心概念和设计思路如下: +- OP 节点: 可理解成1个推理模型、一个处理方法,甚至是训练前向代码,可独立运行,独立设置并发度。每个 OP 节点的计算结果放入其绑定的 Channel 中。 +- Channel 数据管道: 可理解为一个单向缓冲队列。每个 Channel 只接收上游 OP 节点的计算输出,作为下游 OP 节点的输入。 +- 工作流:根据用户定义的节点依赖关系,图执行引擎自动生成有向无环图。每条用户请求到达图执行引擎时会生成一个唯一自增 ID,通过这种唯一性绑定关系标记流水线中的不同请求。 + +- 对于 OP 之间需要传输过大数据的情况,可以考虑 RAM DB 外存进行全局存储,通过在 Channel 中传递索引的 Key 来进行数据传输 + +## 基础功能 + +展开网络服务层和图执行引擎 Pipeline 服务、OP、重写OP前后处理,最后介绍特定OP(RequestOp和ResponseOp)二次开发的方法。 + + +**一.Pipeline 服务** + +PipelineServer包装了RPC运行层和图引擎执行,所有Pipeline服务首先要实例化PipelineServer示例,再设置2个核心接口 set_response_op、加载配置信息,最后调用run_server启动服务。代码示例如下: + +```python +server = PipelineServer() +server.set_response_op(response_op) +server.prepare_server(config_yml_path) +#server.prepare_pipeline_config(config_yml_path) +server.run_server() +``` + +PipelineServer的核心接口: +- `set_response_op`,设置response_op 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。 +- `prepare_server`: 加载配置信息,并启动远端Serving服务,适用于调用远端远端推理服务 +- `prepare_pipeline_config`,仅加载配置信息,适用于local_prdict +- `run_server`,启动gRPC服务,接收请求 + +**二.OP 设计与实现** + +OP 的设计原则: +- 单个 OP 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel +- 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现 +- 单个 OP 可以控制并发数,从而增加处理并发数 +- 单个 OP 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching +- OP 可以由线程或进程启动 + +其构造函数如下: + +```python +def __init__(name=None, + input_ops=[], + server_endpoints=[], + fetch_list=[], + client_config=None, + client_type=None, + concurrency=1, + timeout=-1, + retry=1, + batch_size=1, + auto_batching_timeout=None, + local_service_handler=None) +``` + +各参数含义如下: + +| 参数名 | 类型 | 含义 | +| :-------------------: | :---------: |:------------------------------------------------: | +| name | (str) | 用于标识 OP 类型的字符串,该字段必须全局唯一。 | +| input_ops | (list) | 当前 OP 的所有前继 OP 的列表。 | +| server_endpoints | (list) |远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,认为是local_precditor模式,从local_service_conf中读取配置。 | +| fetch_list | (list) |远程 Paddle Serving Service 的 fetch 列表。 | +| client_config | (str) |Paddle Serving Service 对应的 Client 端配置文件路径。 | +| client_type | (str) |可选择brpc、grpc或local_predictor。local_predictor不启动Serving服务,进程内预测。 | +| concurrency | (int) | OP 的并发数。 | +| timeout | (int) |process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 | +| retry | (int) |超时重试次数。当该值为 1 时,不进行重试。 | +| batch_size | (int) |进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值,默认为 1。 | +| auto_batching_timeout | (float) |进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。batch_size > 1时,要设置auto_batching_timeout,否则请求数量不足batch_size时会阻塞等待。 | +| local_service_handler | (object) |local predictor handler,Op init()入参赋值 或 在Op init()中创建| + + +**三.Channel 设计与实现** +Channel的设计原则: +- Channel 是 OP 之间共享数据的数据结构,负责共享数据或者共享数据状态信息 +- Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 OP 使用 + +下图为图执行引擎中 Channel 的设计,采用 input buffer 和 output buffer 进行多 OP 输入或多 OP 输出的数据对齐,中间采用一个 Queue 进行缓冲 + +
+ +
+ +**四.二次开发** +提供给开发者的二次开发接口有三种,分别是推理 OP 二次开发接口、RequestOp 二次开发接口和 ResponseOp 二次开发接口。 + + +1. 推理 OP 二次开发 + +推理 OP 为开发者提供3个外部函数接口: + +| 变量或接口 | 说明 | +| :----------------------------------------------: | :----------------------------------------------------------: | +| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) | +| def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) | +| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) | +| def init_op(self) | 用于加载资源(如字典等)。 | +| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 | + +OP 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下: + +```python +def preprocess(self, input_dicts): + # multiple previous Op + if len(input_dicts) != 1: + raise NotImplementedError( + 'this Op has multiple previous inputs. Please override this func.' + ) + (_, input_dict), = input_dicts.items() + return input_dict + +def process(self, feed_dict_list, typical_logid): + err, err_info = ChannelData.check_batch_npdata(feed_dict_list) + if err != 0: + raise NotImplementedError( + "{} Please override preprocess func.".format(err_info)) + call_result = self.client.predict( + feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid) + if isinstance(self.client, MultiLangClient): + if call_result is None or call_result["serving_status_code"] != 0: + return None + call_result.pop("serving_status_code") + return call_result + +def postprocess(self, input_dicts, fetch_dict): + return fetch_dict +``` + +**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。 + +**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。 + +**postprocess** 的参数是 `input_dicts` 和 `fetch_dict`,`input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。 + +用户还可以对 **init_op** 函数进行重写,已加载自定义的一些资源(比如字典等),默认实现如下: + +```python +def init_op(self): + pass +``` + +需要**注意**的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的。 + +2. RequestOp 二次开发 + +RequestOp 用于处理 Pipeline Server 接收到的 RPC 数据,处理后的数据将会被加入到图执行引擎中。其功能实现如下: + +```python +class RequestOp(Op): + def __init__(self): + # PipelineService.name = "@DAGExecutor" + super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) + os._exit(-1) + + def unpack_request_package(self, request): + dict_data = {} + log_id = None + if request is None: + _LOGGER.critical("request is None") + raise ValueError("request is None") + + for idx, key in enumerate(request.key): + dict_data[key] = request.value[idx] + log_id = request.logid + _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ + name:{}, method:{}".format(log_id, request.clientip, request.name, + request.method)) + + return dict_data, log_id, None, "" +``` + +**unpack_request_package** 的默认实现是将 RPC request 中的 key 和 value 做成字典交给第一个自定义OP。当默认的RequestOp无法满足参数解析需求时,可通过重写下面2个接口自定义请求参数解析方法。 + +| 接口 | 说明 | +| :---------------------------------------: | :----------------------------------------: | +| init_op(self) | OP初始化,设置默认名称@DAGExecutor | +| unpack_request_package(self, request) | 处理接收的RPC数据 | + +3. ResponseOp 二次开发 + +ResponseOp 用于处理图执行引擎的预测结果,处理后的数据将会作为 Pipeline Server 的RPC 返回值,其函数实现如下,在pack_response_package中做了精简 + +```python +class ResponseOp(Op): + def __init__(self, input_ops): + super(ResponseOp, self).__init__( + name="@DAGExecutor", input_ops=input_ops) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format( + e, exc_info=True)) + os._exit(-1) + + def pack_response_package(self, channeldata): + resp = pipeline_service_pb2.Response() + error_code = channeldata.error_code + error_info = "" + ... + + # pack results + if error_code is None: + error_code = 0 + resp.err_no = error_code + resp.err_msg = error_info + + return resp +``` +**pack_response_package** 的默认实现是将预测结果的字典转化为 RPC response 中的 key 和 value。当默认的 ResponseOp 无法满足结果返回格式要求时,可通过重写下面2个接口自定义返回包打包方法。 + +| 接口 | 说明 | +| :------------------------------------------: | :-----------------------------------------: | +| init_op(self) | OP 初始化,设置默认名称 @DAGExecutor | +| pack_response_package(self, channeldata) | 处理接收的 RPC 数据 | + +**五.自定义业务错误类型** +用户可根据业务场景自定义错误码,继承 ProductErrCode,在 Op 的 preprocess 或 postprocess 中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。 +```python +class ProductErrCode(enum.Enum): + """ + ProductErrCode is a base class for recording business error code. + product developers inherit this class and extend more error codes. + """ + pass +``` + +**六.日志追踪** + +Pipeline 服务日志在当前目录的 `PipelineServingLogs` 目录下,有3种类型日志,分别是 `pipeline.log`、`pipeline.log.wf`、`pipeline.tracer`。 +- `pipeline.log` : 记录 debug & info日志信息 +- `pipeline.log.wf` : 记录 warning & error日志 +- `pipeline.tracer` : 统计各个阶段耗时、channel 堆积信息 + +在服务发生异常时,错误信息会记录在 pipeline.log.wf 日志中。打印 tracer 日志要求在 config.yml 的 DAG 属性中添加 tracer 配置。 + +1. 日志与请求的唯一标识 +Pipeline 中有2种 id 用以串联请求,分别是 data_id 和 log_id,二者区别如下: +- data_id : Pipeline 框架生成的自增 ID,标记请求唯一性标识 +- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识 + +通常,Pipeline 框架打印的日志会同时带上 data_id 和 log_id。开启 auto-batching 后,会使用批量中的第一个 data_id 标记 batch 整体,同时框架会在一条日志中打印批量中所有 data_id。 + +2. 日志滚动 +Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.RotatingFileHandler` 支持磁盘日志文件的轮换。根据不同文件级别和日质量分别设置了 `maxBytes` 和 `backupCount`,当即将超出预定大小时,将关闭旧文件并打开一个新文件用于输出。 + +```python +"handlers": { + "f_pipeline.log": { + "class": "logging.handlers.RotatingFileHandler", + "level": "INFO", + "formatter": "normal_fmt", + "filename": os.path.join(log_dir, "pipeline.log"), + "maxBytes": 512000000, + "backupCount": 20, + }, + "f_pipeline.log.wf": { + "class": "logging.handlers.RotatingFileHandler", + "level": "WARNING", + "formatter": "normal_fmt", + "filename": os.path.join(log_dir, "pipeline.log.wf"), + "maxBytes": 512000000, + "backupCount": 10, + }, + "f_tracer.log": { + "class": "logging.handlers.RotatingFileHandler", + "level": "INFO", + "formatter": "tracer_fmt", + "filename": os.path.join(log_dir, "pipeline.tracer"), + "maxBytes": 512000000, + "backupCount": 5, + }, +} + +``` + + +**七.异构硬件** +Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种异构硬件推理部署。在 `config.yml` 中由 `device_type` 和 `devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下: +- CPU(Intel) : 0 +- GPU(Jetson/海光DCU) : 1 +- TensorRT : 2 +- CPU(Arm) : 3 +- XPU : 4 +- Ascend310 : 5 +- ascend910 : 6 + +config.yml中硬件配置: +``` +#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu +device_type: 0 + +#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 +devices: "" # "0,1" +``` + +**八.低精度推理** +Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示: + +- CPU + - fp32(default) + - fp16 + - bf16(mkldnn) +- GPU + - fp32(default) + - fp16 + - int8 +- Tensor RT + - fp32(default) + - fp16 + - int8 + +使用int8时,要开启use_calib: True + +参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例 diff --git a/doc/Offical_Docs/7-2_Python_Pipeline_Usage_CN.md b/doc/Offical_Docs/7-2_Python_Pipeline_Usage_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..73fbca42b0126a8c2fd932d3682c99c68f7fa07a --- /dev/null +++ b/doc/Offical_Docs/7-2_Python_Pipeline_Usage_CN.md @@ -0,0 +1,245 @@ +# Python Pipeline 使用案例 + +Python Pipeline 使用案例部署步骤可分为下载模型、配置、编写代码、推理测试4个步骤。 + +所有Pipeline示例在[examples/Pipeline/](../../examples/Pipeline) 目录下,目前有7种类型模型示例: +- [PaddleClas](../../examples/Pipeline/PaddleClas) +- [Detection](../../examples/Pipeline/PaddleDetection) +- [bert](../../examples/Pipeline/PaddleNLP/bert) +- [imagenet](../../examples/Pipeline/PaddleClas/imagenet) +- [imdb_model_ensemble](../../examples/Pipeline/imdb_model_ensemble) +- [ocr](../../examples/Pipeline/PaddleOCR/ocr) +- [simple_web_service](../../examples/Pipeline/simple_web_service) + +以 imdb_model_ensemble 为例来展示如何使用 Pipeline Serving,相关代码在 `Serving/examples/Pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示: + +
+ +
+ +** 部署需要的文件 ** +需要五类文件,其中模型文件、配置文件、服务端代码是构建Pipeline服务必备的三个文件。测试客户端和测试数据集为测试准备 +- 模型文件 +- 配置文件(config.yml) + - 服务级别:服务端口、gRPC线程数、服务超时、重试次数等 + - DAG级别:资源类型、开启Trace、性能profile + - OP级别:模型路径、并发度、推理方式、计算硬件、推理超时、自动批量等 +- 服务端(web_server.py) + - 服务级别:定义服务名称、读取配置文件、启动服务 + - DAG级别:指定多OP之间的拓扑关系 + - OP级别:重写OP前后处理 +- 测试客户端 + - 正确性校验 + - 压力测试 +- 测试数据集 + - 图片、文本、语音等 + + +## 获取模型 + +示例中通过`get_data.sh`获取模型文件,示例中的模型文件已保存Feed/Fetch Var参数,如没有保存请跳转到[保存Serving部署参数]()步骤。 +```shell +cd Serving/examples/Pipeline/imdb_model_ensemble +sh get_data.sh +``` + +## 创建config.yaml +本示例采用了brpc的client连接类型,还可以选择grpc或local_predictor。 +```yaml +#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1 +rpc_port: 18070 + +#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port +http_port: 18071 + +#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG +#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num +worker_num: 4 + +#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG +build_dag_each_worker: False + +dag: + #op资源类型, True, 为线程模型;False,为进程模型 + is_thread_op: True + + #重试次数 + retry: 1 + + #使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用 + use_profile: False + + #channel的最大长度,默认为0 + channel_size: 0 + + #tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据 + tracer: + #每次trace的时间间隔,单位秒/s + interval_s: 10 +op: + bow: + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 + concurrency: 1 + + # client连接类型,brpc, grpc和local_predictor + client_type: brpc + + # Serving交互重试次数,默认不重试 + retry: 1 + + # Serving交互超时时间, 单位ms + timeout: 3000 + + # Serving IPs + server_endpoints: ["127.0.0.1:9393"] + + # bow模型client端配置 + client_config: "imdb_bow_client_conf/serving_client_conf.prototxt" + + # Fetch结果列表,以client_config中fetch_var的alias_name为准 + fetch_list: ["prediction"] + + # 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞 + batch_size: 2 + + # 批量查询超时,与batch_size配合使用 + auto_batching_timeout: 2000 + cnn: + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 + concurrency: 1 + + # client连接类型,brpc + client_type: brpc + + # Serving交互重试次数,默认不重试 + retry: 1 + + # 预测超时时间, 单位ms + timeout: 3000 + + # Serving IPs + server_endpoints: ["127.0.0.1:9292"] + + # cnn模型client端配置 + client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt" + + # Fetch结果列表,以client_config中fetch_var的alias_name为准 + fetch_list: ["prediction"] + + # 批量查询Serving的数量, 默认1。 + batch_size: 2 + + # 批量查询超时,与batch_size配合使用 + auto_batching_timeout: 2000 + combine: + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 + concurrency: 1 + + # Serving交互重试次数,默认不重试 + retry: 1 + + # 预测超时时间, 单位ms + timeout: 3000 + + # 批量查询Serving的数量, 默认1。 + batch_size: 2 + + # 批量查询超时,与batch_size配合使用 + auto_batching_timeout: 2000 +``` + +## 编写 Server 代码 + +代码示例中,重点留意3个自定义Op的preprocess、postprocess处理,以及Combin Op初始化列表input_ops=[bow_op, cnn_op],设置Combin Op的前置OP列表。 + +```python +from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp +from paddle_serving_server.pipeline import PipelineServer +from paddle_serving_server.pipeline.proto import pipeline_service_pb2 +from paddle_serving_server.pipeline.channel import ChannelDataEcode +import numpy as np +from paddle_serving_app.reader import IMDBDataset + +class ImdbRequestOp(RequestOp): + def init_op(self): + self.imdb_dataset = IMDBDataset() + self.imdb_dataset.load_resource('imdb.vocab') + + def unpack_request_package(self, request): + dictdata = {} + for idx, key in enumerate(request.key): + if key != "words": + continue + words = request.value[idx] + word_ids, _ = self.imdb_dataset.get_words_and_label(words) + dictdata[key] = np.array(word_ids) + return dictdata + + +class CombineOp(Op): + def preprocess(self, input_data): + combined_prediction = 0 + for op_name, data in input_data.items(): + combined_prediction += data["prediction"] + data = {"prediction": combined_prediction / 2} + return data + + +read_op = ImdbRequestOp() +bow_op = Op(name="bow", + input_ops=[read_op], + server_endpoints=["127.0.0.1:9393"], + fetch_list=["prediction"], + client_config="imdb_bow_client_conf/serving_client_conf.prototxt", + concurrency=1, + timeout=-1, + retry=1) +cnn_op = Op(name="cnn", + input_ops=[read_op], + server_endpoints=["127.0.0.1:9292"], + fetch_list=["prediction"], + client_config="imdb_cnn_client_conf/serving_client_conf.prototxt", + concurrency=1, + timeout=-1, + retry=1) +combine_op = CombineOp( + name="combine", + input_ops=[bow_op, cnn_op], + concurrency=5, + timeout=-1, + retry=1) + +# use default ResponseOp implementation +response_op = ResponseOp(input_ops=[combine_op]) + +server = PipelineServer() +server.set_response_op(response_op) +server.prepare_server('config.yml') +server.run_server() +``` + +## 启动服务验证 + +```python +from paddle_serving_client.pipeline import PipelineClient +import numpy as np + +client = PipelineClient() +client.connect(['127.0.0.1:18080']) + +words = 'i am very sad | 0' + +futures = [] +for i in range(3): + futures.append( + client.predict( + feed_dict={"words": words}, + fetch=["prediction"], + asyn=True)) + +for f in futures: + res = f.result() + if res["ecode"] != 0: + print(res) + exit(1) +``` diff --git a/doc/Offical_Docs/7-3_Python_Pipeline_Senior_CN.md b/doc/Offical_Docs/7-3_Python_Pipeline_Senior_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..8678d64a762177aebc27045390e953d53054baf2 --- /dev/null +++ b/doc/Offical_Docs/7-3_Python_Pipeline_Senior_CN.md @@ -0,0 +1,96 @@ +# Python Pipeline 高阶用法 + +高阶用法在复杂场景中使用,实现更多自定义能力,包括 DAG 跳过某个OP运行、自定义数据传输结构以及多卡推理等。 + +## DAG 跳过某个OP运行 + +为 DAG 图中跳过某个 OP 运行,实际做法是在跳过此 OP 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。 +preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 OP 的 process 阶段,直接进入 postprocess 处理。 + +```python +def preprocess(self, input_dicts, data_id, log_id): + """ + In preprocess stage, assembling data for process stage. users can + override this function for model feed features. + Args: + input_dicts: input data to be preprocessed + data_id: inner unique id + log_id: global unique id for RTT + Return: + input_dict: data for process stage + is_skip_process: skip process stage or not, False default + prod_errcode: None default, otherwise, product errores occured. + It is handled in the same way as exception. + prod_errinfo: "" default + """ + # multiple previous Op + if len(input_dicts) != 1: + _LOGGER.critical( + self._log( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.")) + os._exit(-1) + (_, input_dict), = input_dicts.items() + return input_dict, False, None, "" + +``` + +## 自定义 proto 中 Request 和 Response 结构 + +当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。 + +> pipeline/gateway/proto/gateway.proto + +> pipeline/proto/pipeline_service.proto + +再重新编译 Serving Server。 + + +## 自定义 URL +grpc gateway 处理 post 请求,默认 `method` 是 `prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义 name 和 method,对于已有 url 的服务可无缝切换。 + +```proto +service PipelineService { + rpc inference(Request) returns (Response) { + option (google.api.http) = { + post : "/{name=*}/{method=*}" + body : "*" + }; + } +}; +``` + +## 批量推理 +Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下: +- 场景1:一个推理请求包含批量数据(batch) + - 单条数据定长,批量变长,数据转成BCHW格式 + - 单条数据变长,前处理中将单条数据做 padding 转成定长 +- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch) + - 由于 padding 会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢 + - 指定一个块大小,从而缩小"极大"尺寸数据的作用范围 +- 场景3:合并多个请求数据批量推理(auto-batching) + - 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率 + - 要求多个请求数据的 shape 一致 + +| 接口 | 说明 | +| :------------------------------------------: | :-----------------------------------------: | +| batch | client 发送批量数据,client.predict 的 batch=True | +| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess| +| auto-batching | config.yml 中 OP 级别设置 batch_size 和 auto_batching_timeout | + + +### 4.6 单机多卡 +单机多卡推理,M 个 OP 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 OP 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。 +- 进程ID: 0 绑定 GPU 卡0 +- 进程ID: 1 绑定 GPU 卡1 +- 进程ID: 2 绑定 GPU 卡2 +- 进程ID: 3 绑定 GPU 卡0 +- 进程ID: 4 绑定 GPU 卡1 +- 进程ID: 5 绑定 GPU 卡2 +- 进程ID: 6 绑定 GPU 卡0 + +`config.yml` 中硬件配置: +``` +#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡 +devices: "0,1,2" +``` diff --git a/doc/Offical_Docs/7-4_Python_Pipeline_Optimize_CN.md b/doc/Offical_Docs/7-4_Python_Pipeline_Optimize_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..bc49f23cb099da9bbada7e394c0b495ee1e12b36 --- /dev/null +++ b/doc/Offical_Docs/7-4_Python_Pipeline_Optimize_CN.md @@ -0,0 +1,79 @@ +# Python Pipeline 优化指南 + + +## 如何通过 Timeline 工具进行优化 + +为了更好地对性能进行优化,Python Pipeline 提供了 Timeline 工具,对整个服务的各个阶段时间进行打点。 + +## 在 Server 端输出 Profile 信息 + +Server 端用 yaml 中的 `use_profile` 字段进行控制: + +```yaml +dag: + use_profile: true +``` + +开启该功能后,Server 端在预测的过程中会将对应的日志信息打印到标准输出,为了更直观地展现各阶段的耗时,提供 Analyst 模块对日志文件做进一步的分析处理。 + +使用时先将 Server 的输出保存到文件,以 `profile.txt` 为例,脚本将日志中的时间打点信息转换成 json 格式保存到 `trace` 文件,`trace` 文件可以通过 chrome 浏览器的 tracing 功能进行可视化。 + +```python +from paddle_serving_server.pipeline import Analyst +import json +import sys + +if __name__ == "__main__": + log_filename = "profile.txt" + trace_filename = "trace" + analyst = Analyst(log_filename) + analyst.save_trace(trace_filename) +``` + +具体操作:打开 chrome 浏览器,在地址栏输入 `chrome://tracing/` ,跳转至 tracing 页面,点击 load 按钮,打开保存的 `trace` 文件,即可将预测服务的各阶段时间信息可视化。 + +## 在 Client 端输出 Profile 信息 + +Client 端在 `predict` 接口设置 `profile=True`,即可开启 Profile 功能。 + +开启该功能后,Client 端在预测的过程中会将该次预测对应的日志信息打印到标准输出,后续分析处理同 Server。 + +## 分析方法 +根据 `pipeline.tracer` 日志中的各个阶段耗时,按以下公式逐步分析出主要耗时在哪个阶段。 +``` +单 OP 耗时: +op_cost = process(pre + mid + post) + +OP 期望并发数: +op_concurrency = 单OP耗时(s) * 期望QPS + +服务吞吐量: +service_throughput = 1 / 最慢OP的耗时 * 并发数 + +服务平响: +service_avg_cost = ∑op_concurrency 【关键路径】 + +Channel 堆积: +channel_acc_size = QPS(down - up) * time + +批量预测平均耗时: +avg_batch_cost = (N * pre + mid + post) / N +``` + +## 优化思路 + +根据长耗时在不同阶段,采用不同的优化方法. +- OP 推理阶段(mid-process): + - 增加 OP 并发度 + - 开启 auto-batching (前提是多个请求的 shape 一致) + - 若批量数据中某条数据的 shape 很大,padding 很大导致推理很慢,可使用 mini-batch + - 开启 TensorRT/MKL-DNN 优化 + - 开启低精度推理 +- OP 前处理阶段(pre-process): + - 增加 OP 并发度 + - 优化前处理逻辑 +- in/out 耗时长(channel 堆积>5) + - 检查 channel 传递的数据大小和延迟 + - 优化传入数据,不传递数据或压缩后再传入 + - 增加 OP 并发度 + - 减少上游 OP 并发度 diff --git a/examples/Pipeline/PaddleNLP/bert/README.md b/examples/Pipeline/PaddleNLP/bert/README.md index c396b77c9d2b9198d0474540872cb1c4dcdce5b1..0fb8739b430f49f8bc066eb9a357ee92850b89d3 100644 --- a/examples/Pipeline/PaddleNLP/bert/README.md +++ b/examples/Pipeline/PaddleNLP/bert/README.md @@ -1,6 +1,6 @@ -# Imagenet Pipeline WebService +# Bert Pipeline WebService -This document will takes Imagenet service as an example to introduce how to use Pipeline WebService. +This document will takes Bert service as an example to introduce how to use Pipeline WebService. ## Get model ``` diff --git a/examples/Pipeline/PaddleNLP/bert/README_CN.md b/examples/Pipeline/PaddleNLP/bert/README_CN.md index 841abdadf5a3848fcf1e042d8e73c051610eefaa..eb93951d50ffb0c3bd899d816294ffde66e646a9 100644 --- a/examples/Pipeline/PaddleNLP/bert/README_CN.md +++ b/examples/Pipeline/PaddleNLP/bert/README_CN.md @@ -1,6 +1,6 @@ -# Imagenet Pipeline WebService +# Bert Pipeline WebService -这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。 +这里以 Bert 服务为例来介绍 Pipeline WebService 的使用。 ## 获取模型 ``` diff --git a/tools/Dockerfile.cuda10.1-cudnn7-gcc54.devel b/tools/Dockerfile.cuda10.1-cudnn7-gcc54.devel index 7c2d19dc1a303cff2fb0cf16e857d0652be89e0b..6a162e89fc997239d57709f4949ed52d174266ec 100644 --- a/tools/Dockerfile.cuda10.1-cudnn7-gcc54.devel +++ b/tools/Dockerfile.cuda10.1-cudnn7-gcc54.devel @@ -84,7 +84,7 @@ RUN ln -sf /usr/local/bin/python3.6 /usr/local/bin/python3 && ln -sf /usr/local/ RUN rm -r /root/python_build # Install Go and glide -RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \ +RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \ tar -xz -C /usr/local && \ mkdir /root/go && \ mkdir /root/go/bin && \ diff --git a/tools/Dockerfile.java b/tools/Dockerfile.java index 661943ed033c15b8a8a4084a0585411db200a361..2cb085c2ead72ddace34428caeefc22385a89246 100644 --- a/tools/Dockerfile.java +++ b/tools/Dockerfile.java @@ -1,7 +1,7 @@ # A image for building paddle binaries # # Use cuda devel base image for both cpu and gpu environment # # When you modify it, please be aware of cudnn-runtime version -FROM hub.baidubce.com/paddlepaddle/serving:latest-cuda10.2-cudnn8-devel +FROM registry.baidubce.com/paddlepaddle/serving:0.8.0-cuda10.2-cudnn8-devel MAINTAINER PaddlePaddle Authors diff --git a/tools/Dockerfile.runtime_template b/tools/Dockerfile.runtime_template index b900e772f07eae981f3d7f4dc46734a7a79939de..a72fd72acdcd1e2034b390fa8db681df344e1549 100644 --- a/tools/Dockerfile.runtime_template +++ b/tools/Dockerfile.runtime_template @@ -40,7 +40,7 @@ WORKDIR /home RUN bash /build_scripts/install_trt.sh <> && rm -rf /build_scripts # install go -RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \ +RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \ tar -xz -C /usr/local && \ mkdir /root/go && \ mkdir /root/go/bin && \ diff --git a/tools/generate_k8s_yamls.sh b/tools/generate_k8s_yamls.sh index c1b542ef5146739fde75d2e80e7898a2b832512e..892c03bf02e2f5d96b9ba69b637359f2bc362194 100644 --- a/tools/generate_k8s_yamls.sh +++ b/tools/generate_k8s_yamls.sh @@ -12,6 +12,7 @@ function usage echo " --workdir : workdir in image"; echo " --command : command to launch serving" echo " --port : serving port" + echo " --pod_num : number of pod replicas" echo " -h | --help : helper"; } @@ -20,6 +21,9 @@ function parse_args # positional args args=() + # default + pod_num=1 + # named args while [ "$1" != "" ]; do case "$1" in @@ -28,6 +32,7 @@ function parse_args --workdir ) workdir="$2"; shift;; --command ) start_command="$2"; shift;; --port ) port="$2"; shift;; + --pod_num ) pod_num="$2"; shift;; -h | --help ) usage; exit;; # quit and show usage * ) args+=("$1") # if no match, add it to the positional args esac @@ -41,7 +46,7 @@ function parse_args positional_2="${args[1]}" # validate required args - if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" ]]; then + if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" || -z "${pod_num}"]]; then echo "Invalid arguments. check your params again." usage exit; @@ -59,6 +64,7 @@ function run echo "named arg: workdir: $workdir" echo "named arg: command: $start_command" echo "named arg: port: $port" + echo "named arg: pod_num: $pod_num" sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_serving.yaml_template > k8s_serving.yaml sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_ingress.yaml_template > k8s_ingress.yaml diff --git a/tools/generate_runtime_docker.sh b/tools/generate_runtime_docker.sh index 1e58527795aeea6b156277a12af2ea36a2086724..e2a2262267565ef52cb4475d2ccd584dc0414bad 100644 --- a/tools/generate_runtime_docker.sh +++ b/tools/generate_runtime_docker.sh @@ -78,7 +78,8 @@ function run echo "named arg: image_name: $image_name" sed -e "s/<>/$base_image/g" -e "s/<>/$python/g" -e "s/<>/$env/g" -e "s/<>/$serving/g" -e "s/<>/$paddle/g" tools/Dockerfile.runtime_template > Dockerfile.tmp - docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp . + #docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp . + docker build --network=host -t $image_name -f Dockerfile.tmp . } run "$@"; diff --git a/tools/k8s_serving.yaml_template b/tools/k8s_serving.yaml_template index b66d929bf5e3856c50ba4871cb02a5192a26b6ff..dfe2f3ee670adb88e7ffffbee077f3a1f42a64b2 100644 --- a/tools/k8s_serving.yaml_template +++ b/tools/k8s_serving.yaml_template @@ -20,7 +20,7 @@ metadata: app: << APP_NAME >> name: << APP_NAME >> spec: - replicas: 1 + replicas: << POD_NUM >> selector: matchLabels: app: << APP_NAME >> diff --git a/tools/paddle_env_install.sh b/tools/paddle_env_install.sh index 3f062027b427daaf3cc64612ab5982bdc2c1374c..5dc776f50bb1cca48c3c6352f1a6baf4c075d16d 100644 --- a/tools/paddle_env_install.sh +++ b/tools/paddle_env_install.sh @@ -21,7 +21,7 @@ function env_install() { apt install -y libcurl4-openssl-dev libbz2-dev wget https://paddle-serving.bj.bcebos.com/others/centos_ssl.tar && tar xf centos_ssl.tar && rm -rf centos_ssl.tar && mv libcrypto.so.1.0.2k /usr/lib/libcrypto.so.1.0.2k && mv libssl.so.1.0.2k /usr/lib/libssl.so.1.0.2k && ln -sf /usr/lib/libcrypto.so.1.0.2k /usr/lib/libcrypto.so.10 && ln -sf /usr/lib/libssl.so.1.0.2k /usr/lib/libssl.so.10 && ln -sf /usr/lib/libcrypto.so.10 /usr/lib/libcrypto.so && ln -sf /usr/lib/libssl.so.10 /usr/lib/libssl.so - rm -rf /usr/local/go && wget -qO- https://paddle-ci.gz.bcebos.com/go1.15.12.linux-amd64.tar.gz | \ + rm -rf /usr/local/go && wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \ tar -xz -C /usr/local && \ mkdir /root/go && \ mkdir /root/go/bin && \