提交 6895a070 编写于 作者: F felixhjh

add error_catch, while param check is not complete

上级 80aa6381
......@@ -26,7 +26,7 @@ import os
import logging
import collections
import json
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataErrcode, ChannelDataType, ChannelStopError,
......@@ -42,7 +42,6 @@ class DAGExecutor(object):
"""
DAG Executor, the service entrance of DAG.
"""
def __init__(self, response_op, server_conf, worker_idx):
"""
Initialize DAGExecutor.
......@@ -114,6 +113,7 @@ class DAGExecutor(object):
self._client_profile_key = "pipeline.profile"
self._client_profile_value = "1"
@ErrorCatch
def start(self):
"""
Starting one thread for receiving data from the last channel background.
......@@ -479,20 +479,35 @@ class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def __init__(self, request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
_LOGGER.info("{}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive))
@ErrorCatch
@ParamChecker
def init_helper(self, request_name: str,
response_op,
use_profile: [bool, None],
is_thread_op: bool,
channel_size,
build_dag_each_worker: [bool, None],
tracer: [dict, None],
channel_recv_frist_arrive):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
init_helper(self, request_name, response_op, use_profile, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive)
_LOGGER.info("[DAG] Succ init")
@staticmethod
......
......@@ -3,7 +3,7 @@ import enum
import os
import logging
import traceback
from paddle_serving_server.pipeline import ResponseOp
#from paddle_serving_server.pipeline import ResponseOp
import threading
import inspect
import traceback
......@@ -13,43 +13,22 @@ from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
_LOGGER = logging.getLogger(__name__)
class Singleton(object):
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not hasattr(Singleton, "_instance"):
with Singleton._lock:
if not hasattr(Singleton, "_instance"):
Singleton._instance = super(Singleton, cls).__new__(cls, *args, **kwargs)
return Singleton._instance
def set_exception_response(self, error_code, error_info):
self.resp = pipeline_service_pb2.Response()
self.resp.err_no = error_code
self.resp.err_msg = error_info.replace("\n", " ").replace("\t", " ")[2:]
def get_exception_response(self):
if hasattr(self, "resp"):
return self.resp
else:
return None
class CustomExceptionCode(enum.Enum):
"""
Add new Exception
"""
INTERNAL_EXCEPTION = 0
TYPE_EXCEPTION = 1
TIMEOUT_EXCEPTION = 2
CONF_EXCEPTION = 3
PARAMETER_INVALID = 4
INTERNAL_EXCEPTION = 500
TYPE_EXCEPTION = 501
TIMEOUT_EXCEPTION = 502
CONF_EXCEPTION = 503
PARAMETER_INVALID = 504
class CustomException(Exception):
def __init__(self, exceptionCode, errorMsg, isSendToUser):
def __init__(self, exceptionCode, errorMsg, isSendToUser=False):
super().__init__(self)
self.error_info = "\n\texception_code: {}\n"\
"\texception_type: {}\n"\
"\terror_msg: {}"\
"\terror_msg: {}\n"\
"\tis_send_to_user: {}".format(exceptionCode.value,
CustomExceptionCode(exceptionCode).name, errorMsg, isSendToUser)
......@@ -62,22 +41,103 @@ class ErrorCatch():
@functools.wraps(func)
def wrapper(*args, **kw):
try:
func(*args, **kw)
res = func(*args, **kw)
except CustomException as e:
resp = pipeline_service_pb2.Response()
_LOGGER.error("{}\tFunctionName: {}{}".format(traceback.format_exc(), func.__name__, args))
split_list = re.split("\n|\t|:", str(e))
error_code = int(split_list[3])
error_info = "{}\n\tClassName: {} FunctionName: {}".format(str(e), func.__class__ ,func.__name__)
resp.err_no = int(split_list[3])
resp.err_msg = "{}\n\tClassName: {}, FunctionName: {}, ErrNo: {}".format(str(e), func.__class__ ,func.__name__, resp.err_no)
is_send_to_user = split_list[-1]
if is_send_to_user == True:
self.record_error_info(error_code, error_info)
if bool(is_send_to_user) is True:
return (None, resp)
# self.record_error_info(error_code, error_info)
else:
raise("server error occur")
raise("init server error occur")
except Exception as e:
resp = pipeline_service_pb2.Response()
_LOGGER.error("{}\tFunctionName: {}{}".format(traceback.format_exc(), func.__name__, args))
resp.err_no = 404
resp.err_msg = "{}\n\tClassName: {} FunctionName: {}, ErrNo: {}".format(str(e), func.__class__ ,func.__name__, resp.err_no)
return (None, resp)
# other exception won't be sent to users.
else:
resp = pipeline_service_pb2.Response()
resp.err_no = 200
resp.err_msg = ""
return (res, resp)
return wrapper
def record_error_info(self, error_code, error_info):
ExceptionSingleton.set_exception_response(error_code, error_info)
def ParamChecker(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
# fetch the argument name list.
parameters = inspect.signature(function).parameters
argument_list = list(parameters.keys())
# fetch the argument checker list.
checker_list = [parameters[argument].annotation for argument in argument_list]
# fetch the value list.
value_list = [inspect.getcallargs(function, *args, **kwargs)[argument] for argument in inspect.getfullargspec(function).args]
# initialize the result dictionary, where key is argument, value is the checker result.
result_dictionary = dict()
for argument, value, checker in zip(argument_list, value_list, checker_list):
result_dictionary[argument] = check(argument, value, checker, function)
# fetch the invalid argument list.
invalid_argument_list = [key for key in argument_list if not result_dictionary[key]]
# if there are invalid arguments, raise the error.
if len(invalid_argument_list) > 0:
raise CustomException(CustomExceptionCode.PARAMETER_INVALID, "invalid arg list: {}".format(invalid_argument_list))
# check the result.
result = function(*args, **kwargs)
checker = inspect.signature(function).return_annotation
if not check('return', result, checker, function):
raise CustomException(CustomExceptionCode.PARAMETER_INVALID, "invalid return type")
# return the result.
return result
return wrapper
def check(name, value, checker, function):
if isinstance(checker, (tuple, list, set)):
return True in [check(name, value, sub_checker, function) for sub_checker in checker]
elif checker is inspect._empty:
return True
elif checker is None:
return value is None
elif isinstance(checker, type):
return isinstance(value, checker)
elif callable(checker):
result = checker(value)
return result
class ParamVerify(object):
@staticmethod
def int_check(c, lower_bound=None, upper_bound=None):
if not isinstance(c, int):
return False
if isinstance(lower_bound, int) and isinstance(upper_bound, int):
return c >= lower_bound and c <= upper_bound
return True
@staticmethod
def file_check(f):
if not isinstance(f, str):
return False
if os.path.exist(f):
return True
else:
return False
ErrorCatch = ErrorCatch()
ExceptionSingleton = Singleton()
......@@ -15,6 +15,7 @@
import os
import logging
import multiprocessing
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode
#from paddle_serving_server import OpMaker, OpSeqMaker
#from paddle_serving_server import Server as GpuServer
#from paddle_serving_server import Server as CpuServer
......
......@@ -34,6 +34,7 @@ elif sys.version_info.major == 3:
else:
raise Exception("Error Python version")
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode
from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataErrcode,
ChannelData, ChannelDataType, ChannelStopError,
......@@ -801,46 +802,39 @@ class Op(object):
preped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
skip_process_dict = {}
@ErrorCatch
def preprocess_help(self, parsed_data, data_id, logid_dict):
preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
return preped_data, is_skip_process, prod_errcode, prod_errinfo
for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None
is_skip_process = False
prod_errcode, prod_errinfo = None, None
log_id = logid_dict.get(data_id)
try:
preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
# Set skip_process_dict
process_res, resp = preprocess_help(self, parsed_data, data_id, logid_dict)
if resp.err_no is 200:
preped_data, is_skip_process, prod_errcode, prod_errinfo = process_res
if is_skip_process is True:
skip_process_dict[data_id] = True
except TypeError as e:
# Error type in channeldata.datatype
error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.TYPE_ERROR.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
except Exception as e:
error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
if prod_errcode is not None:
_LOGGER.error("data_id: {} return product error. Product ErrNo:{}, Product ErrMsg: {}".format(data_id, prod_errcode, prod_errinfo))
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
skip_process_dict[data_id] = True
if error_channeldata is not None:
err_channeldata_dict[data_id] = error_channeldata
......@@ -1019,7 +1013,7 @@ class Op(object):
# 2 kinds of errors
if error_code != ChannelDataErrcode.OK.value or midped_batch is None:
error_info = "(log_id={}) {} failed to predict.".format(
error_info = "(log_id={}) {} failed to predict. Please check the input dict and checkout PipelineServingLogs/pipeline.log for more details.".format(
typical_logid, self.name)
_LOGGER.error(error_info)
for data_id in data_ids:
......@@ -1095,68 +1089,57 @@ class Op(object):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
@ErrorCatch
def postprocess_help(self, parsed_data_dict, midped_data, data_id, logid_dict):
postped_data, prod_errcode, prod_errinfo = self.postprocess(parsed_data_dict[data_id],
midped_data, data_id, logid_dict.get(data_id))
if not isinstance(postped_data, dict):
raise CustomException(CustomExceptionCode.TYPE_EXCEPTION, "postprocess should return dict", True)
return postped_data, prod_errcode, prod_errinfo
for data_id, midped_data in midped_data_dict.items():
log_id = logid_dict.get(data_id)
postped_data, err_channeldata = None, None
prod_errcode, prod_errinfo = None, None
try:
postped_data, prod_errcode, prod_errinfo = self.postprocess(
parsed_data_dict[data_id], midped_data, data_id,
logid_dict.get(data_id))
except Exception as e:
error_info = "(data_id={} log_id={}) {} Failed to postprocess: {}".format(
data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
post_res, resp = postprocess_help(self, parsed_data_dict, midped_data, data_id, logid_dict)
if resp.err_no is 200:
postped_data, prod_errcode, prod_errinfo = post_res
if prod_errcode is not None:
# product errors occured
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
if err_channeldata is not None:
err_channeldata_dict[data_id] = err_channeldata
continue
else:
if not isinstance(postped_data, dict):
error_info = "(log_id={} log_id={}) {} Failed to postprocess: " \
"output of postprocess funticon must be " \
"dict type, but get {}".format(
data_id, log_id, op_info_prefix,
type(postped_data))
_LOGGER.error(error_info)
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info,
data_id=data_id,
log_id=log_id)
err_channeldata_dict[data_id] = err_channeldata
continue
output_data = None
err, _ = ChannelData.check_npdata(postped_data)
if err == 0:
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
else:
output_data = ChannelData(
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data
output_data = None
err, _ = ChannelData.check_npdata(postped_data)
if err == 0:
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
else:
output_data = ChannelData(
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data
_LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict
......@@ -1507,26 +1490,29 @@ class Op(object):
Returns:
TimeProfiler
"""
if is_thread_op:
with self._for_init_op_lock:
if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None
# init client
self.client = self.init_client(self._client_config,
@ErrorCatch
def init_helper(self, is_thread_op, concurrency_idx):
if is_thread_op:
with self._for_init_op_lock:
if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
# user defined
self.init_op()
self._succ_init_op = True
self._succ_close_op = False
else:
self.concurrency_idx = concurrency_idx
# init client
self.client = self.init_client(self._client_config,
# user defined
self.init_op()
self._succ_init_op = True
self._succ_close_op = False
else:
self.concurrency_idx = concurrency_idx
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
# user defined
self.init_op()
# user defined
self.init_op()
init_helper(self, is_thread_op, concurrency_idx)
# use a separate TimeProfiler per thread or process
profiler = TimeProfiler()
profiler.enable(True)
......
......@@ -24,7 +24,7 @@ import yaml
import io
import time
import os
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from . import operator
from . import dag
......@@ -40,14 +40,21 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
"""
Pipeline Servicer entrance.
"""
def __init__(self, name, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__()
self._name = name
# init dag executor
self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start()
@ErrorCatch
@ParamChecker
def init_helper(self, name, response_op,
dag_conf: dict,
worker_idx=-1):
self._name = name
self._dag_executor = dag.DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start()
super(PipelineServicer, self).__init__()
init_res = init_helper(self, name, response_op, dag_conf, worker_idx)
if init_res[1].err_no is not 200:
raise CustomException(CustomExceptionCode.INTERNAL_EXCEPTION, "pipeline server init error")
_LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册