diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index b7c229ea6f85794849e156524616647f4bc121ec..c23645ebe8e5b89ace060f0b15f7d7ca59d54538 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -29,36 +29,11 @@ import enum import os import copy import time +from .error_catch import ErrorCatch, CustomExceptioni, ProductErrCode +from .error_catch import CustomExceptionCode as ChannelDataErrcode _LOGGER = logging.getLogger(__name__) - -class ChannelDataErrcode(enum.Enum): - """ - ChannelData error code - """ - OK = 0 - TIMEOUT = 1 - NOT_IMPLEMENTED = 2 - TYPE_ERROR = 3 - RPC_PACKAGE_ERROR = 4 - CLIENT_ERROR = 5 - CLOSED_ERROR = 6 - NO_SERVICE = 7 - UNKNOW = 8 - INPUT_PARAMS_ERROR = 9 - - PRODUCT_ERROR = 100 - - -class ProductErrCode(enum.Enum): - """ - ProductErrCode is a base class for recording business error code. - product developers inherit this class and extend more error codes. - """ - pass - - class ChannelDataType(enum.Enum): """ Channel data type diff --git a/python/pipeline/error_catch.py b/python/pipeline/error_catch.py index 6d51994b0c7efdc94289dbcfc6ff335437e22162..75b4538e9dc6167b11bfc45ddd34243d93a5e5af 100644 --- a/python/pipeline/error_catch.py +++ b/python/pipeline/error_catch.py @@ -10,18 +10,48 @@ import traceback import functools import re from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2 +from .util import ThreadIdGenerator _LOGGER = logging.getLogger(__name__) class CustomExceptionCode(enum.Enum): """ Add new Exception + + 0 Success + 50 ~ 99 Product error + 3000 ~ 3999 Internal service error + 4000 ~ 4999 Conf error + 5000 ~ 5999 User input error + 6000 ~ 6999 Timeout error + 7000 ~ 7999 Type Check error + 8000 ~ 8999 Internal communication error + 9000 ~ 9999 Inference error + 10000 Other error + """ + OK = 0 + PRODUCT_ERROR = 50 + + NOT_IMPLEMENTED = 3000 + CLOSED_ERROR = 3001 + NO_SERVICE = 3002 + INIT_ERROR = 3003 + CONF_ERROR = 4000 + INPUT_PARAMS_ERROR = 5000 + TIMEOUT = 6000 + TYPE_ERROR = 7000 + RPC_PACKAGE_ERROR = 8000 + CLIENT_ERROR = 9000 + UNKNOW = 10000 + + +class ProductErrCode(enum.Enum): """ - INTERNAL_EXCEPTION = 500 - TYPE_EXCEPTION = 501 - TIMEOUT_EXCEPTION = 502 - CONF_EXCEPTION = 503 - PARAMETER_INVALID = 504 + ProductErrCode is a base class for recording business error code. + product developers inherit this class and extend more error codes. + """ + pass + class CustomException(Exception): def __init__(self, exceptionCode, errorMsg, isSendToUser=False): @@ -35,42 +65,48 @@ class CustomException(Exception): def __str__(self): return self.error_info + + class ErrorCatch(): + def __init__(self): + self._id_generator = ThreadIdGenerator( + max_id=1000000000000000000, + base_counter=0, + step=1) + def __call__(self, func): if inspect.isfunction(func) or inspect.ismethod(func): @functools.wraps(func) def wrapper(*args, **kw): try: res = func(*args, **kw) - except CustomException as e: + except CustomException as e: + log_id = self._id_generator.next() resp = pipeline_service_pb2.Response() - _LOGGER.error("{}\tFunctionName: {}{}".format(traceback.format_exc(), func.__name__, args)) + _LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName:{}\nArgs:{}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__, args)) split_list = re.split("\n|\t|:", str(e)) resp.err_no = int(split_list[3]) - resp.err_msg = "{}\n\tClassName: {}, FunctionName: {}, ErrNo: {}".format(str(e), func.__class__ ,func.__name__, resp.err_no) - is_send_to_user = split_list[-1] - if bool(is_send_to_user) is True: + resp.err_msg = "Log_id: {} ErrNo: {} Error_msg: {} ClassName: {} FunctionName: {}".format(log_id, resp.err_no, split_list[9], func.__qualname__ ,func.__name__ ) + is_send_to_user = split_list[-1].replace(" ", "") + if is_send_to_user == "True": return (None, resp) - # self.record_error_info(error_code, error_info) else: - raise("init server error occur") + raise SystemExit("init server error occur") except Exception as e: + log_id = self._id_generator.next() resp = pipeline_service_pb2.Response() - _LOGGER.error("{}\tFunctionName: {}{}".format(traceback.format_exc(), func.__name__, args)) - resp.err_no = 404 - resp.err_msg = "{}\n\tClassName: {} FunctionName: {}, ErrNo: {}".format(str(e), func.__class__ ,func.__name__, resp.err_no) + _LOGGER.error("\nLog_id: {}\n{}Classname: {}\nFunctionName: {}\nArgs: {}".format(log_id, traceback.format_exc(), func.__qualname__, func.__name__, args)) + resp.err_no = CustomExceptionCode.UNKNOW.value + resp.err_msg = "Log_id: {} ErrNo: {} Error_msg: {} ClassName: {} FunctionName: {}".format(log_id, resp.err_no, str(e).replace("\'", ""), func.__qualname__ ,func.__name__ ) return (None, resp) # other exception won't be sent to users. else: resp = pipeline_service_pb2.Response() - resp.err_no = 200 + resp.err_no = CustomExceptionCode.OK.value resp.err_msg = "" return (res, resp) return wrapper - - def record_error_info(self, error_code, error_info): - ExceptionSingleton.set_exception_response(error_code, error_info) def ParamChecker(function): @functools.wraps(function) @@ -95,13 +131,13 @@ def ParamChecker(function): # if there are invalid arguments, raise the error. if len(invalid_argument_list) > 0: - raise CustomException(CustomExceptionCode.PARAMETER_INVALID, "invalid arg list: {}".format(invalid_argument_list)) + raise CustomException(CustomExceptionCode.INPUT_PARAMS_ERROR, "invalid arg list: {}".format(invalid_argument_list)) # check the result. result = function(*args, **kwargs) checker = inspect.signature(function).return_annotation if not check('return', result, checker, function): - raise CustomException(CustomExceptionCode.PARAMETER_INVALID, "invalid return type") + raise CustomException(CustomExceptionCode.INPUT_PARAMS_ERROR, "invalid return type") # return the result. return result diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 63bdd8833985399287e47e0a911a50a3363c232c..a093cb6067dad299739815190ffc682c73e024b7 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -1094,7 +1094,7 @@ class Op(object): postped_data, prod_errcode, prod_errinfo = self.postprocess(parsed_data_dict[data_id], midped_data, data_id, logid_dict.get(data_id)) if not isinstance(postped_data, dict): - raise CustomException(CustomExceptionCode.TYPE_EXCEPTION, "postprocess should return dict", True) + raise CustomException(CustomExceptionCode.TYPE_ERROR, "postprocess should return dict", True) return postped_data, prod_errcode, prod_errinfo for data_id, midped_data in midped_data_dict.items(): diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 38bc7c6ac786a5cc957b096aa19b338e6305e833..452bb36853f95ed9a63527fe7cc7701ca436efc6 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -53,8 +53,8 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): super(PipelineServicer, self).__init__() init_res = init_helper(self, name, response_op, dag_conf, worker_idx) - if init_res[1].err_no is not 200: - raise CustomException(CustomExceptionCode.INTERNAL_EXCEPTION, "pipeline server init error") + if init_res[1].err_no != CustomExceptionCode.OK.value : + raise CustomException(CustomExceptionCode.INIT_ERROR, "pipeline server init error") _LOGGER.info("[PipelineServicer] succ init") def inference(self, request, context):