diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index b688fc477ae5c787c1617d4e6db820ceba2845cf..e8733e3de62a43d27dd11b63dc76277313fe3d81 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -47,6 +47,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s + ### Extreme Case Consideration - Request timeout diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index 03779bd916cd5b39b03f95577ed9d72dc2758138..9b601675abe3b8d50f252fce9a9be86f5e042350 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -6,6 +6,7 @@ Paddle Serving 通常用于单模型的一键部署,但端到端的深度学 Paddle Serving 提供了用户友好的多模型组合服务编程框架,Pipeline Serving,旨在降低编程门槛,提高资源使用率(尤其是GPU设备),提升整体的预估效率。 + ## 整体架构设计 Server端基于 gRPC 和图执行引擎构建,两者的关系如下图所示。 diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 3d47dd686c2c56ab4d47393e221409c9ccafb99f..b18b5ed8c43312481384913109be2830ad1eeb0f 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -364,9 +364,9 @@ class Op(object): input_offset.append(offset) else: _LOGGER.critical( - "{} Failed to process: expect input type is dict(sample" - " input) or list(batch input), but get {}".format( - op_info_prefix, type(one_input))) + "{} Failed to process: expect input type is dict(sample" + " input) or list(batch input), but get {}".format( + op_info_prefix, type(one_input))) os._exit(-1) midped_batch = None @@ -434,10 +434,10 @@ class Op(object): typical_logid, op_info_prefix, name)) lod_var_names.add(name) lod_offset_names.add(lod_offset_name) - + for idx, data_id in enumerate(data_ids): midped_data_dict[data_id] = {} - + for name, value in midped_batch.items(): if name in lod_offset_names: continue @@ -450,7 +450,8 @@ class Op(object): data_offset_right = input_offset[idx + 1] lod_offset_left = lod_offset[data_offset_left] lod_offset_right = lod_offset[data_offset_right] - midped_data_dict[data_id][name] = value[lod_offset_left:lod_offset_right] + midped_data_dict[data_id][name] = value[ + lod_offset_left:lod_offset_right] midped_data_dict[data_id][lod_offset_name] = \ lod_offset[data_offset_left:data_offset_right + 1] - lod_offset[data_offset_left] else: diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index ca8954aac83243b90981c5fbe52c32527e226c8c..e8229e810308b10d35f903a8415d898177bc2239 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -117,7 +117,8 @@ class PipelineServer(object): server = grpc.server( futures.ThreadPoolExecutor(max_workers=self._worker_num), options=[('grpc.max_send_message_length', 256 * 1024 * 1024), - ('grpc.max_receive_message_length', 256 * 1024 * 1024)]) + ('grpc.max_receive_message_length', 256 * 1024 * 1024) + ]) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( PipelineServicer(self._response_op, self._conf), server) server.add_insecure_port('[::]:{}'.format(self._port)) @@ -126,8 +127,8 @@ class PipelineServer(object): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): options = [('grpc.so_reuseport', 1), - ('grpc.max_send_message_length', 256 * 1024 * 1024), - ('grpc.max_send_message_length', 256 * 1024 * 1024)] + ('grpc.max_send_message_length', 256 * 1024 * 1024), + ('grpc.max_send_message_length', 256 * 1024 * 1024)] server = grpc.server( futures.ThreadPoolExecutor( max_workers=1, ), options=options) diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index cfc56ca2bf4b4cdb49f12531321317932029eae5..b83bdd1dc8c5c948353c8ee95f51fe325e38dbfc 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -89,7 +89,7 @@ class PerformanceTracer(object): if not succ: err_count += 1 err_request.append(req_id) - + if name not in op_cost: op_cost[name] = {} @@ -130,7 +130,8 @@ class PerformanceTracer(object): _LOGGER.info("\tQuery count[{}]".format(tot)) _LOGGER.info("\tQPS[{} q/s]".format(qps)) _LOGGER.info("\tSucc[{}]".format(1 - 1.0 * err_count / tot)) - _LOGGER.info("\tError req[{}]".format(", ".join([str(x) for x in err_request)])) + _LOGGER.info("\tError req[{}]".format(", ".join( + [str(x) for x in err_request]))) _LOGGER.info("\tLatency:") _LOGGER.info("\t\tave[{} ms]".format(ave_cost)) for latency in latencys: