diff --git a/doc/Quick_Start_CN.md b/doc/Quick_Start_CN.md index d211fe13a626fd24da8bd45718e44827fcecf36a..3b203b6032ea0b4d652563e70a5c134535e7dc24 100644 --- a/doc/Quick_Start_CN.md +++ b/doc/Quick_Start_CN.md @@ -123,3 +123,13 @@ python3 pipeline_rpc_client.py ``` {'err_no': 0, 'err_msg': '', 'key': ['res'], 'value': ["['土地整治与土壤修复研究中心', '华南农业大学1素图']"]} ``` + +

关闭Serving/Pipeline服务

+ +**方式一** :Ctrl+C关停服务 + +**方式二** :在启动Serving/Pipeline服务路径或者环境变量SERVING_HOME路径下(该路径下存在文件ProcessInfo.json) + +``` +python3 -m paddle_serving_server.serve stop +``` diff --git a/doc/Quick_Start_EN.md b/doc/Quick_Start_EN.md index 65352f9fa11e36bcdfc968269477dd974f7a57c3..f5bafe3cffd736142d48f51b9dcf7e98d506a057 100644 --- a/doc/Quick_Start_EN.md +++ b/doc/Quick_Start_EN.md @@ -94,3 +94,13 @@ output ``` {'err_no': 0, 'err_msg': '', 'key': ['res'], 'value': ["['土地整治与土壤修复研究中心', '华南农业大学1素图']"]} ``` + +

Stop Serving/Pipeline service

+ +**Method one** :Ctrl+C to quit + +**Method Two** :In the path where starting the Serving/Pipeline service or the path which environment variable SERVING_HOME set (the file named ProcessInfo.json exists in this path) + +``` +python3 -m paddle_serving_server.serve stop +``` diff --git a/doc/Serving_Configure_CN.md b/doc/Serving_Configure_CN.md index 312b052bd1ffe38e6c9f9af40cee21c0519a0a65..011fa84a024f195ac4063c6a45ece823c31ae0f4 100644 --- a/doc/Serving_Configure_CN.md +++ b/doc/Serving_Configure_CN.md @@ -59,7 +59,7 @@ fetch_var { ## C++ Serving -### 1.快速启动 +### 1.快速启动与关闭 可以通过配置模型及端口号快速启动服务,启动命令如下: @@ -107,6 +107,11 @@ python3 -m paddle_serving_server.serve --model serving_model --thread 10 --port ```BASH python3 -m paddle_serving_server.serve --model serving_model_1 serving_model_2 --thread 10 --port 9292 ``` +#### 当您想要关闭Serving服务时(在Serving启动目录或环境变量SERVING_HOME路径下,执行以下命令). +```BASH +python3 -m paddle_serving_server.serve stop +``` +stop参数发送SIGINT至C++ Serving,若改成kill则发送SIGKILL信号至C++ Serving ### 2.自定义配置启动 @@ -312,7 +317,20 @@ fetch_var { ``` ## Python Pipeline +### 快速启动与关闭 +Python Pipeline启动命令如下: + +```BASH +python3 web_service.py +``` + +当您想要关闭Serving服务时(在Pipeline启动目录下或环境变量SERVING_HOME路径下,执行以下命令): +```BASH +python3 -m paddle_serving_server.serve stop +``` +stop参数发送SIGINT至Pipeline Serving,若改成kill则发送SIGKILL信号至Pipeline Serving +### 配置文件 Python Pipeline提供了用户友好的多模型组合服务编程框架,适用于多模型组合应用的场景。 其配置文件为YAML格式,一般默认为config.yaml。示例如下: ```YAML diff --git a/doc/Serving_Configure_EN.md b/doc/Serving_Configure_EN.md index 7fe011449a1935fd6fa43fe2fea8d9008b9fab35..4e19971fd4a7c0f737265fdb064679d66fc9e64f 100644 --- a/doc/Serving_Configure_EN.md +++ b/doc/Serving_Configure_EN.md @@ -107,7 +107,7 @@ python3 -m paddle_serving_server.serve --model serving_model --thread 10 --port ```BASH python3 -m paddle_serving_server.serve --model serving_model_1 serving_model_2 --thread 10 --port 9292 ``` -#### Stop Serving. +#### Stop Serving(execute the following command in the directory where start serving or the path which environment variable SERVING_HOME set). ```BASH python3 -m paddle_serving_server.serve stop ``` @@ -325,9 +325,9 @@ fetch_var { Example starting Pipeline Serving: ```BASH -python3 -m paddle_serving_server.serve --model serving_model --port 9393 +python3 web_service.py ``` -### Stop Serving. +### Stop Serving(execute the following command in the directory where start Pipeline serving or the path which environment variable SERVING_HOME set). ```BASH python3 -m paddle_serving_server.serve stop ``` diff --git a/doc/images/wechat_group_1.jpeg b/doc/images/wechat_group_1.jpeg index dd5c55e04d60f271c0d9d7e3bc9ee12ae92ea149..518d0d42d2faa90df5253b853917666ba87c33ea 100644 Binary files a/doc/images/wechat_group_1.jpeg and b/doc/images/wechat_group_1.jpeg differ diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 3f2d104e959afd34cb10f90205afaac71ebba6ae..1275bc704884f6d497da446d03914e24f62fd8ea 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -29,8 +29,9 @@ import json from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify from .operator import Op, RequestOp, ResponseOp, VirtualOp from .channel import (ThreadChannel, ProcessChannel, ChannelData, - ChannelDataErrcode, ChannelDataType, ChannelStopError, - ProductErrCode) + ChannelDataType, ChannelStopError) +from .error_catch import ProductErrCode +from .error_catch import CustomExceptionCode as ChannelDataErrcode from .profiler import TimeProfiler, PerformanceTracer from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager from .proto import pipeline_service_pb2 @@ -508,6 +509,7 @@ class DAG(object): init_helper(self, request_name, response_op, use_profile, is_thread_op, channel_size, build_dag_each_worker, tracer, channel_recv_frist_arrive) + print("[DAG] Succ init") _LOGGER.info("[DAG] Succ init") @staticmethod diff --git a/python/pipeline/error_catch.py b/python/pipeline/error_catch.py index ef70f895a151d48bceb966da6a0c6369ad4cdd18..205026bcb9022f34f7b6ac7875df217456baf8eb 100644 --- a/python/pipeline/error_catch.py +++ b/python/pipeline/error_catch.py @@ -48,13 +48,24 @@ class CustomExceptionCode(enum.Enum): class ProductErrCode(enum.Enum): """ - ProductErrCode is a base class for recording business error code. - product developers inherit this class and extend more error codes. + ProductErrCode is to record business error codes. + the ProductErrCode number ranges from 51 to 99 + product developers can directly add error code into this class. """ pass class CustomException(Exception): + """ + An self-defined exception class + + Usage : raise CustomException(CustomExceptionCode.exceptionCode, errorMsg, isSendToUser=False) + Args : + exceptionCode : CustomExceptionCode or ProductErrCode + errorMsg : string message you want to describe the error + isSendToUser : whether send to user or just record in errorlog + Return : An string of error_info + """ def __init__(self, exceptionCode, errorMsg, isSendToUser=False): super().__init__(self) self.error_info = "\n\texception_code: {}\n"\ @@ -69,12 +80,15 @@ class CustomException(Exception): class ErrorCatch(): - def __init__(self): - self._id_generator = ThreadIdGenerator( - max_id=1000000000000000000, - base_counter=0, - step=1) + """ + An decorator class to catch error for method or function. + Usage : @ErrorCatch + Args : None + Returns: tuple(res, response) + res is the original funciton return + response includes erro_no and erro_msg + """ def __call__(self, func): if inspect.isfunction(func) or inspect.ismethod(func): @functools.wraps(func) @@ -82,27 +96,36 @@ class ErrorCatch(): try: res = func(*args, **kw) except CustomException as e: - log_id = self._id_generator.next() + if "log_id" in kw.keys(): + log_id = kw["log_id"] + elif "logid_dict" in kw.keys() and "data_id" in kw.keys(): + log_id = kw["logid_dict"].get(kw["data_id"]) + else: + log_id = 0 resp = pipeline_service_pb2.Response() - _LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName:{}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__)) + _LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName: {}\nArgs: {}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__, args)) split_list = re.split("\n|\t|:", str(e)) resp.err_no = int(split_list[3]) - resp.err_msg = "Log_id: {} ErrNo: {} Error_msg: {} ClassName: {} FunctionName: {}".format(log_id, resp.err_no, split_list[9], func.__qualname__ ,func.__name__ ) + resp.err_msg = "Log_id: {} Raise_msg: {} ClassName: {} FunctionName: {}".format(log_id, split_list[9], func.__qualname__ ,func.__name__ ) is_send_to_user = split_list[-1].replace(" ", "") if is_send_to_user == "True": return (None, resp) else: print("Erro_Num: {} {}".format(resp.err_no, resp.err_msg)) - print("Init error occurs. For detailed information, Please look up log by log_id.") + print("Init error occurs. For detailed information. Please look up pipeline.log.wf in PipelineServingLogs by log_id.") kill_stop_process_by_pid("kill", os.getpgid(os.getpid())) except Exception as e: - log_id = self._id_generator.next() + if "log_id" in kw.keys(): + log_id = kw["log_id"] + elif "logid_dict" in kw.keys() and "data_id" in kw.keys(): + log_id = kw["logid_dict"].get(kw["data_id"]) + else: + log_id = 0 resp = pipeline_service_pb2.Response() _LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName: {}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__)) resp.err_no = CustomExceptionCode.UNKNOW.value - resp.err_msg = "Log_id: {} ErrNo: {} Error_msg: {} ClassName: {} FunctionName: {}".format(log_id, resp.err_no, str(e).replace("\'", ""), func.__qualname__ ,func.__name__ ) + resp.err_msg = "Log_id: {} Raise_msg: {} ClassName: {} FunctionName: {}".format(log_id, str(e).replace("\'", ""), func.__qualname__ ,func.__name__ ) return (None, resp) - # other exception won't be sent to users. else: resp = pipeline_service_pb2.Response() resp.err_no = CustomExceptionCode.OK.value diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 8fe73e430ff6991bb65a1f7cf651d2b8554612f8..55d190c39ffe63b6c68b388eeb4643d7b171c089 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -36,9 +36,10 @@ else: from .error_catch import ErrorCatch, CustomException, CustomExceptionCode from .proto import pipeline_service_pb2 -from .channel import (ThreadChannel, ProcessChannel, ChannelDataErrcode, - ChannelData, ChannelDataType, ChannelStopError, - ChannelTimeoutError, ProductErrCode) +from .channel import (ThreadChannel, ProcessChannel,ChannelData, + ChannelDataType, ChannelStopError, ChannelTimeoutError) +from .error_catch import ProductErrCode +from .error_catch import CustomExceptionCode as ChannelDataErrcode from .util import NameGenerator from .profiler import UnsafeTimeProfiler as TimeProfiler from . import local_service_handler @@ -828,7 +829,8 @@ class Op(object): is_skip_process = False prod_errcode, prod_errinfo = None, None log_id = logid_dict.get(data_id) - process_res, resp = preprocess_help(self, parsed_data, data_id, logid_dict) + process_res, resp = preprocess_help(self, parsed_data, data_id = data_id, + logid_dict = logid_dict) if resp.err_no == CustomExceptionCode.OK.value: preped_data, is_skip_process, prod_errcode, prod_errinfo = process_res if is_skip_process is True: @@ -1117,7 +1119,8 @@ class Op(object): postped_data, err_channeldata = None, None prod_errcode, prod_errinfo = None, None - post_res, resp = postprocess_help(self, parsed_data_dict, midped_data, data_id, logid_dict) + post_res, resp = postprocess_help(self, parsed_data_dict, midped_data, data_id + = data_id, logid_dict = logid_dict) if resp.err_no == CustomExceptionCode.OK.value: postped_data, prod_errcode, prod_errinfo = post_res if prod_errcode is not None: @@ -1528,6 +1531,7 @@ class Op(object): self.init_op() init_helper(self, is_thread_op, concurrency_idx) + print("[OP Object] init success") # use a separate TimeProfiler per thread or process profiler = TimeProfiler() profiler.enable(True) diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 452bb36853f95ed9a63527fe7cc7701ca436efc6..54a16c94882a903e37b090c2c02bb88cb8a5a4a2 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -55,6 +55,7 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): init_res = init_helper(self, name, response_op, dag_conf, worker_idx) if init_res[1].err_no != CustomExceptionCode.OK.value : raise CustomException(CustomExceptionCode.INIT_ERROR, "pipeline server init error") + print("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init") def inference(self, request, context):