提交 8ea6f038 编写于 作者: T TeslaZhao

update FAQ & Pipeline

上级 94da03bd
...@@ -46,6 +46,10 @@ InvalidArgumentError: Device id must be less than GPU count, but received id is: ...@@ -46,6 +46,10 @@ InvalidArgumentError: Device id must be less than GPU count, but received id is:
**A:** 目前(0.4.0)仅支持CentOS,具体列表查阅[这里](https://github.com/PaddlePaddle/Serving/blob/develop/doc/DOCKER_IMAGES.md) **A:** 目前(0.4.0)仅支持CentOS,具体列表查阅[这里](https://github.com/PaddlePaddle/Serving/blob/develop/doc/DOCKER_IMAGES.md)
#### Q: 使用Java客户端,mvn compile过程出现"No compiler is provided in this environment. Perhaps you are running on a JRE rather than a JDK?"错误
**A:** 没有安装JDK,或者JAVA_HOME路径配置错误(正确配置是JDK路径,常见错误配置成JRE路径,例如正确路径参考JAVA_HOME="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/")。Java JDK安装参考https://segmentfault.com/a/1190000015389941
## 预测问题 ## 预测问题
......
...@@ -32,7 +32,10 @@ import copy ...@@ -32,7 +32,10 @@ import copy
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class ChannelDataEcode(enum.Enum): class ChannelDataErrcode(enum.Enum):
"""
ChannelData error code
"""
OK = 0 OK = 0
TIMEOUT = 1 TIMEOUT = 1
NOT_IMPLEMENTED = 2 NOT_IMPLEMENTED = 2
...@@ -42,6 +45,15 @@ class ChannelDataEcode(enum.Enum): ...@@ -42,6 +45,15 @@ class ChannelDataEcode(enum.Enum):
CLOSED_ERROR = 6 CLOSED_ERROR = 6
NO_SERVICE = 7 NO_SERVICE = 7
UNKNOW = 8 UNKNOW = 8
PRODUCT_ERROR = 9
class ProductErrCode(enum.Enum):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
pass
class ChannelDataType(enum.Enum): class ChannelDataType(enum.Enum):
...@@ -56,20 +68,23 @@ class ChannelData(object): ...@@ -56,20 +68,23 @@ class ChannelData(object):
npdata=None, npdata=None,
dictdata=None, dictdata=None,
data_id=None, data_id=None,
ecode=None, log_id=None,
error_code=None,
error_info=None, error_info=None,
prod_error_code=None,
prod_error_info=None,
client_need_profile=False): client_need_profile=False):
''' '''
There are several ways to use it: There are several ways to use it:
1. ChannelData(ChannelDataType.CHANNEL_NPDATA.value, npdata, data_id) 1. ChannelData(ChannelDataType.CHANNEL_NPDATA.value, npdata, data_id, log_id)
2. ChannelData(ChannelDataType.DICT.value, dictdata, data_id) 2. ChannelData(ChannelDataType.DICT.value, dictdata, data_id, log_id)
3. ChannelData(ecode, error_info, data_id) 3. ChannelData(error_code, error_info, prod_error_code, prod_error_info, data_id, log_id)
Protobufs are not pickle-able: Protobufs are not pickle-able:
https://stackoverflow.com/questions/55344376/how-to-import-protobuf-module https://stackoverflow.com/questions/55344376/how-to-import-protobuf-module
''' '''
if ecode is not None: if error_code is not None or prod_error_code is not None:
if data_id is None or error_info is None: if data_id is None or error_info is None:
_LOGGER.critical("Failed to generate ChannelData: data_id" _LOGGER.critical("Failed to generate ChannelData: data_id"
" and error_info cannot be None") " and error_info cannot be None")
...@@ -77,25 +92,30 @@ class ChannelData(object): ...@@ -77,25 +92,30 @@ class ChannelData(object):
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
else: else:
if datatype == ChannelDataType.CHANNEL_NPDATA.value: if datatype == ChannelDataType.CHANNEL_NPDATA.value:
ecode, error_info = ChannelData.check_npdata(npdata) error_code, error_info = ChannelData.check_npdata(npdata)
if ecode != ChannelDataEcode.OK.value: if error_code != ChannelDataErrcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error("(logid={}) {}".format(data_id, error_info)) _LOGGER.error("(data_id={} log_id={}) {}".format(
data_id, log_id, error_info))
elif datatype == ChannelDataType.DICT.value: elif datatype == ChannelDataType.DICT.value:
ecode, error_info = ChannelData.check_dictdata(dictdata) error_code, error_info = ChannelData.check_dictdata(dictdata)
if ecode != ChannelDataEcode.OK.value: if error_code != ChannelDataErrcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error("(logid={}) {}".format(data_id, error_info)) _LOGGER.error("(data_id={} log_id={}) {}".format(
data_id, log_id, error_info))
else: else:
_LOGGER.critical("(logid={}) datatype not match".format( _LOGGER.critical("(data_id={} log_id={}) datatype not match".
data_id)) format(data_id, log_id))
os._exit(-1) os._exit(-1)
self.datatype = datatype self.datatype = datatype
self.npdata = npdata self.npdata = npdata
self.dictdata = dictdata self.dictdata = dictdata
self.id = data_id self.id = data_id
self.ecode = ecode self.log_id = log_id
self.error_code = error_code
self.error_info = error_info self.error_info = error_info
self.prod_error_code = prod_error_code
self.prod_error_info = prod_error_info
self.client_need_profile = client_need_profile self.client_need_profile = client_need_profile
self.profile_data_set = set() self.profile_data_set = set()
...@@ -106,67 +126,67 @@ class ChannelData(object): ...@@ -106,67 +126,67 @@ class ChannelData(object):
@staticmethod @staticmethod
def check_dictdata(dictdata): def check_dictdata(dictdata):
ecode = ChannelDataEcode.OK.value error_code = ChannelDataErrcode.OK.value
error_info = None error_info = None
if isinstance(dictdata, list): if isinstance(dictdata, list):
# batch data # batch data
for sample in dictdata: for sample in dictdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the type of " \ error_info = "Failed to check data: the type of " \
"data must be dict, but get {}.".format(type(sample)) "data must be dict, but get {}.".format(type(sample))
break break
elif not isinstance(dictdata, dict): elif not isinstance(dictdata, dict):
# batch size = 1 # batch size = 1
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the type of data must " \ error_info = "Failed to check data: the type of data must " \
"be dict, but get {}.".format(type(dictdata)) "be dict, but get {}.".format(type(dictdata))
return ecode, error_info return error_code, error_info
@staticmethod @staticmethod
def check_batch_npdata(batch): def check_batch_npdata(batch):
ecode = ChannelDataEcode.OK.value error_code = ChannelDataErrcode.OK.value
error_info = None error_info = None
for npdata in batch: for npdata in batch:
ecode, error_info = ChannelData.check_npdata(npdata) error_code, error_info = ChannelData.check_npdata(npdata)
if ecode != ChannelDataEcode.OK.value: if error_code != ChannelDataErrcode.OK.value:
break break
return ecode, error_info return error_code, error_info
@staticmethod @staticmethod
def check_npdata(npdata): def check_npdata(npdata):
ecode = ChannelDataEcode.OK.value error_code = ChannelDataErrcode.OK.value
error_info = None error_info = None
if isinstance(npdata, list): if isinstance(npdata, list):
# batch data # batch data
for sample in npdata: for sample in npdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the " \ error_info = "Failed to check data: the " \
"value of data must be dict, but get {}.".format( "value of data must be dict, but get {}.".format(
type(sample)) type(sample))
break break
for _, value in sample.items(): for _, value in sample.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the" \ error_info = "Failed to check data: the" \
" value of data must be np.ndarray, but get {}.".format( " value of data must be np.ndarray, but get {}.".format(
type(value)) type(value))
return ecode, error_info return error_code, error_info
elif isinstance(npdata, dict): elif isinstance(npdata, dict):
# batch_size = 1 # batch_size = 1
for _, value in npdata.items(): for _, value in npdata.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the value " \ error_info = "Failed to check data: the value " \
"of data must be np.ndarray, but get {}.".format( "of data must be np.ndarray, but get {}.".format(
type(value)) type(value))
break break
else: else:
ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
error_info = "Failed to check data: the value of data " \ error_info = "Failed to check data: the value of data " \
"must be dict, but get {}.".format(type(npdata)) "must be dict, but get {}.".format(type(npdata))
return ecode, error_info return error_code, error_info
def parse(self): def parse(self):
feed = None feed = None
...@@ -191,8 +211,9 @@ class ChannelData(object): ...@@ -191,8 +211,9 @@ class ChannelData(object):
return 1 return 1
def __str__(self): def __str__(self):
return "type[{}], ecode[{}], id[{}]".format( return "type[{}], error_code[{}], data_id[{}], log_id[{}]".format(
ChannelDataType(self.datatype).name, self.ecode, self.id) ChannelDataType(self.datatype).name, self.error_code, self.id,
self.log_id)
class ProcessChannel(object): class ProcessChannel(object):
...@@ -289,14 +310,14 @@ class ProcessChannel(object): ...@@ -289,14 +310,14 @@ class ProcessChannel(object):
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, self._log("(data_id={} log_id={}) Op({}) Pushing data".format(
op_name))) channeldata.id, channeldata.log_id, op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"(logid={}) Op({}) Failed to push data: expected number" "(data_id={} log_id={}) Op({}) Failed to push data: expected number"
" of producers to be greater than 0, but the it is 0.". " of producers to be greater than 0, but the it is 0.".
format(channeldata.id, op_name))) format(channeldata.id, channeldata.log_id, op_name)))
os._exit(-1) os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
...@@ -310,19 +331,21 @@ class ProcessChannel(object): ...@@ -310,19 +331,21 @@ class ProcessChannel(object):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Pushed data into internal queue.". self._log(
format(channeldata.id, op_name))) "(data_id={} log_id={}) Op({}) Pushed data into internal queue.".
format(channeldata.id, channeldata.log_id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"(logid={}) Op({}) Failed to push data: there are multiple " "(data_id={} log_id={}) Op({}) Failed to push data: there are multiple "
"producers, so op_name cannot be None.".format( "producers, so op_name cannot be None.".format(
channeldata.id, op_name))) channeldata.id, channeldata.log_id, op_name)))
os._exit(-1) os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
data_id = channeldata.id data_id = channeldata.id
log_id = channeldata.log_id
put_data = None put_data = None
with self._cv: with self._cv:
if data_id not in self._input_buf: if data_id not in self._input_buf:
...@@ -347,8 +370,8 @@ class ProcessChannel(object): ...@@ -347,8 +370,8 @@ class ProcessChannel(object):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pushed data into input_buffer.". "(data_id={} log_id={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name))) format(data_id, log_id, op_name)))
else: else:
while self._stop.value == 0: while self._stop.value == 0:
try: try:
...@@ -361,8 +384,8 @@ class ProcessChannel(object): ...@@ -361,8 +384,8 @@ class ProcessChannel(object):
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pushed data into internal_queue.". "(data_id={} log_id={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name))) format(data_id, log_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
...@@ -404,8 +427,8 @@ class ProcessChannel(object): ...@@ -404,8 +427,8 @@ class ProcessChannel(object):
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Got data".format(resp.values()[0] self._log("(data_id={} log_id={}) Op({}) Got data".format(
.id, op_name))) resp.values()[0].id, resp.values()[0].log_id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
...@@ -434,8 +457,9 @@ class ProcessChannel(object): ...@@ -434,8 +457,9 @@ class ProcessChannel(object):
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pop ready item into output_buffer". "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name))) format(channeldata.values()[0].id,
channeldata.values()[0].log_id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
...@@ -487,8 +511,9 @@ class ProcessChannel(object): ...@@ -487,8 +511,9 @@ class ProcessChannel(object):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Got data from output_buffer".format( self._log(
resp.values()[0].id, op_name))) "(data_id={} log_id={}) Op({}) Got data from output_buffer".
format(resp.values()[0].id, resp.values()[0].log_id, op_name)))
return resp return resp
def stop(self): def stop(self):
...@@ -586,14 +611,14 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -586,14 +611,14 @@ class ThreadChannel(Queue.PriorityQueue):
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, self._log("(data_id={} log_id={}) Op({}) Pushing data".format(
op_name))) channeldata.id, channeldata.log_id, op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"(logid={}) Op({}) Failed to push data: expected number of " "(data_id={} log_id={}) Op({}) Failed to push data: expected number of "
"producers to be greater than 0, but the it is 0.".format( "producers to be greater than 0, but the it is 0.".format(
channeldata.id, op_name))) channeldata.id, channeldata.log_id, op_name)))
os._exit(-1) os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
...@@ -607,19 +632,21 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -607,19 +632,21 @@ class ThreadChannel(Queue.PriorityQueue):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Pushed data into internal_queue.". self._log(
format(channeldata.id, op_name))) "(data_id={} log_id={}) Op({}) Pushed data into internal_queue.".
format(channeldata.id, channeldata.log_id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"(logid={}) Op({}) Failed to push data: there are multiple" "(data_id={} log_id={}) Op({}) Failed to push data: there are multiple"
" producers, so op_name cannot be None.".format( " producers, so op_name cannot be None.".format(
channeldata.id, op_name))) channeldata.id, channeldata.log_id, op_name)))
os._exit(-1) os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
data_id = channeldata.id data_id = channeldata.id
log_id = channeldata.log_id
put_data = None put_data = None
with self._cv: with self._cv:
if data_id not in self._input_buf: if data_id not in self._input_buf:
...@@ -639,8 +666,8 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -639,8 +666,8 @@ class ThreadChannel(Queue.PriorityQueue):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pushed data into input_buffer.". "(data_id={} log_id={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name))) format(data_id, log_id, op_name)))
else: else:
while self._stop is False: while self._stop is False:
try: try:
...@@ -653,8 +680,8 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -653,8 +680,8 @@ class ThreadChannel(Queue.PriorityQueue):
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pushed data into internal_queue.". "(data_id={} log_id={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name))) format(data_id, log_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
...@@ -697,8 +724,8 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -697,8 +724,8 @@ class ThreadChannel(Queue.PriorityQueue):
if self._stop: if self._stop:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Got data".format(resp.values()[0] self._log("(data_id={} log_id={}) Op({}) Got data".format(
.id, op_name))) resp.values()[0].id, resp.values()[0].log_id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
...@@ -727,8 +754,9 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -727,8 +754,9 @@ class ThreadChannel(Queue.PriorityQueue):
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"(logid={}) Op({}) Pop ready item into output_buffer". "(data_id={} log_id={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name))) format(channeldata.values()[0].id,
channeldata.values()[0].log_id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
...@@ -780,8 +808,9 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -780,8 +808,9 @@ class ThreadChannel(Queue.PriorityQueue):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("(logid={}) Op({}) Got data from output_buffer".format( self._log(
resp.values()[0].id, op_name))) "(data_id={} log_id={}) Op({}) Got data from output_buffer".
format(resp.values()[0].id, resp.values()[0].log_id, op_name)))
return resp return resp
def stop(self): def stop(self):
......
...@@ -25,10 +25,12 @@ else: ...@@ -25,10 +25,12 @@ else:
import os import os
import logging import logging
import collections import collections
import json
from .operator import Op, RequestOp, ResponseOp, VirtualOp from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData, from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataErrcode, ChannelDataType, ChannelStopError,
ProductErrCode)
from .profiler import TimeProfiler, PerformanceTracer from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
...@@ -142,7 +144,7 @@ class DAGExecutor(object): ...@@ -142,7 +144,7 @@ class DAGExecutor(object):
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
for data_id, cv in self._cv_pool.items(): for data_id, cv in self._cv_pool.items():
closed_errror_data = ChannelData( closed_errror_data = ChannelData(
ecode=ChannelDataEcode.CLOSED_ERROR.value, error_code=ChannelDataErrcode.CLOSED_ERROR.value,
error_info="dag closed.", error_info="dag closed.",
data_id=data_id) data_id=data_id)
with cv: with cv:
...@@ -194,25 +196,36 @@ class DAGExecutor(object): ...@@ -194,25 +196,36 @@ class DAGExecutor(object):
def _pack_channeldata(self, rpc_request, data_id): def _pack_channeldata(self, rpc_request, data_id):
dictdata = None dictdata = None
log_id = None
try: try:
dictdata = self._unpack_rpc_func(rpc_request) dictdata, log_id, prod_errcode, prod_errinfo = self._unpack_rpc_func(
rpc_request)
except Exception as e: except Exception as e:
_LOGGER.error( _LOGGER.error(
"(logid={}) Failed to parse RPC request package: {}" "(logid={}) Failed to parse RPC request package: {}"
.format(data_id, e), .format(data_id, e),
exc_info=True) exc_info=True)
return ChannelData( return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, error_code=ChannelDataErrcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e), error_info="rpc package error: {}".format(e),
data_id=data_id) data_id=data_id,
log_id=log_id)
else: else:
# because unpack_rpc_func is rewritten by user, we need # because unpack_rpc_func is rewritten by user, we need to look
# to look for client_profile_key field in rpc_request # for product_errcode in returns, and client_profile_key field
# in rpc_request
if prod_errcode is not None:
# product errors occured
return ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
profile_value = None profile_value = None
for idx, key in enumerate(rpc_request.key): profile_value = dictdata.get(self._client_profile_key)
if key == self._client_profile_key:
profile_value = rpc_request.value[idx]
break
client_need_profile = (profile_value == self._client_profile_value) client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("(logid={}) Need profile in client: {}".format( _LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile)) data_id, client_need_profile))
...@@ -220,6 +233,7 @@ class DAGExecutor(object): ...@@ -220,6 +233,7 @@ class DAGExecutor(object):
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
dictdata=dictdata, dictdata=dictdata,
data_id=data_id, data_id=data_id,
log_id=log_id,
client_need_profile=client_need_profile) client_need_profile=client_need_profile)
def call(self, rpc_request): def call(self, rpc_request):
...@@ -253,7 +267,7 @@ class DAGExecutor(object): ...@@ -253,7 +267,7 @@ class DAGExecutor(object):
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
ChannelData( ChannelData(
ecode=ChannelDataEcode.CLOSED_ERROR.value, error_code=ChannelDataErrcode.CLOSED_ERROR.value,
error_info="dag closed.", error_info="dag closed.",
data_id=data_id)) data_id=data_id))
...@@ -261,13 +275,13 @@ class DAGExecutor(object): ...@@ -261,13 +275,13 @@ class DAGExecutor(object):
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v) cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.error_code == ChannelDataErrcode.OK.value:
_LOGGER.info("(logid={}) Succ predict".format(data_id)) _LOGGER.info("(logid={}) Succ predict".format(data_id))
break break
else: else:
_LOGGER.error("(logid={}) Failed to predict: {}" _LOGGER.error("(logid={}) Failed to predict: {}"
.format(data_id, resp_channeldata.error_info)) .format(data_id, resp_channeldata.error_info))
if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: if resp_channeldata.error_code != ChannelDataErrcode.TIMEOUT.value:
break break
if i + 1 < self._retry: if i + 1 < self._retry:
...@@ -288,7 +302,8 @@ class DAGExecutor(object): ...@@ -288,7 +302,8 @@ class DAGExecutor(object):
trace_buffer.put({ trace_buffer.put({
"name": "DAG", "name": "DAG",
"id": data_id, "id": data_id,
"succ": resp_channeldata.ecode == ChannelDataEcode.OK.value, "succ":
resp_channeldata.error_code == ChannelDataErrcode.OK.value,
"actions": { "actions": {
"call_{}".format(data_id): end_call - start_call, "call_{}".format(data_id): end_call - start_call,
}, },
...@@ -317,8 +332,9 @@ class DAGExecutor(object): ...@@ -317,8 +332,9 @@ class DAGExecutor(object):
.format(channeldata.id, e), .format(channeldata.id, e),
exc_info=True) exc_info=True)
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value resp.err_no = ChannelDataErrcode.RPC_PACKAGE_ERROR.value
resp.error_info = "rpc package error: {}".format(e) resp.err_msg = "rpc package error: {}".format(e)
resp.result = ""
return resp return resp
......
...@@ -19,22 +19,27 @@ option go_package = ".;pipeline_serving"; ...@@ -19,22 +19,27 @@ option go_package = ".;pipeline_serving";
import "google/api/annotations.proto"; import "google/api/annotations.proto";
message Response { message Response {
repeated string key = 1; int32 err_no = 1;
repeated string value = 2; string err_msg = 2;
int32 ecode = 3; string result = 3;
string error_info = 4;
}; };
message Request { message Request {
repeated string key = 1; string name = 1;
repeated string value = 2; string method = 2;
string name = 3; string appid = 3;
} int64 logid = 4;
string format = 5;
string from = 6;
string cmdid = 7;
string clientip = 8;
string data = 9;
};
service PipelineService { service PipelineService {
rpc inference(Request) returns (Response) { rpc inference(Request) returns (Response) {
option (google.api.http) = { option (google.api.http) = {
post : "/{name=*}/prediction" post : "/{name=*}/{method=*}"
body : "*" body : "*"
}; };
} }
......
...@@ -24,6 +24,7 @@ import os ...@@ -24,6 +24,7 @@ import os
import sys import sys
import collections import collections
import numpy as np import numpy as np
import json
from numpy import * from numpy import *
if sys.version_info.major == 2: if sys.version_info.major == 2:
import Queue import Queue
...@@ -33,9 +34,9 @@ else: ...@@ -33,9 +34,9 @@ else:
raise Exception("Error Python version") raise Exception("Error Python version")
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, from .channel import (ThreadChannel, ProcessChannel, ChannelDataErrcode,
ChannelData, ChannelDataType, ChannelStopError, ChannelData, ChannelDataType, ChannelStopError,
ChannelTimeoutError) ChannelTimeoutError, ProductErrCode)
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
from . import local_rpc_service_handler from . import local_rpc_service_handler
...@@ -279,7 +280,23 @@ class Op(object): ...@@ -279,7 +280,23 @@ class Op(object):
def _get_output_channels(self): def _get_output_channels(self):
return self._outputs return self._outputs
def preprocess(self, input_dicts): def preprocess(self, input_dicts, data_id, log_id):
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
# multiple previous Op # multiple previous Op
if len(input_dicts) != 1: if len(input_dicts) != 1:
_LOGGER.critical( _LOGGER.critical(
...@@ -289,9 +306,19 @@ class Op(object): ...@@ -289,9 +306,19 @@ class Op(object):
os._exit(-1) os._exit(-1)
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict, False, None, ""
def process(self, feed_batch, typical_logid): def process(self, feed_batch, typical_logid):
"""
In process stage, send requests to the inference server or predict locally.
users do not need to inherit this function
Args:
feed_batch: data to be fed to inference server
typical_logid: mark batch predicts
Returns:
call_result: predict result
"""
err, err_info = ChannelData.check_batch_npdata(feed_batch) err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0: if err != 0:
_LOGGER.critical( _LOGGER.critical(
...@@ -306,27 +333,54 @@ class Op(object): ...@@ -306,27 +333,54 @@ class Op(object):
call_result.pop("serving_status_code") call_result.pop("serving_status_code")
return call_result return call_result
def postprocess(self, input_dict, fetch_dict): def postprocess(self, input_dict, fetch_dict, logid_dict):
return fetch_dict """
In postprocess stage, assemble data for next op or output.
Args:
input_dict: data returned in preprocess stage.
fetch_dict: data returned in process stage.
log_id: logid
Returns:
fetch_dict: return fetch_dict default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
return fetch_dict, None, ""
def _parse_channeldata(self, channeldata_dict): def _parse_channeldata(self, channeldata_dict):
"""
Parse one channeldata
Args:
channeldata_dict : channel data to be parsed, dict type
Return:
data_id: created by dag._id_generator, unique
error_channeldata: error channeldata
parsed_data: get np/dict data from channeldata
client_need_profile: need profile info
profile_set: profile info
log_id: logid for tracing a request
"""
data_id, error_channeldata = None, None data_id, error_channeldata = None, None
client_need_profile, profile_set = False, set() client_need_profile, profile_set = False, set()
parsed_data = {} parsed_data = {}
key = list(channeldata_dict.keys())[0] key = list(channeldata_dict.keys())[0]
data_id = channeldata_dict[key].id data_id = channeldata_dict[key].id
log_id = channeldata_dict[key].log_id
client_need_profile = channeldata_dict[key].client_need_profile client_need_profile = channeldata_dict[key].client_need_profile
for name, data in channeldata_dict.items(): for name, data in channeldata_dict.items():
if data.ecode != ChannelDataEcode.OK.value: if data.error_code != ChannelDataErrcode.OK.value:
error_channeldata = data error_channeldata = data
break break
parsed_data[name] = data.parse() parsed_data[name] = data.parse()
if client_need_profile: if client_need_profile:
profile_set |= data.profile_data_set profile_set |= data.profile_data_set
return (data_id, error_channeldata, parsed_data, client_need_profile, return (data_id, error_channeldata, parsed_data, client_need_profile,
profile_set) profile_set, log_id)
def _push_to_output_channels(self, def _push_to_output_channels(self,
data, data,
...@@ -335,6 +389,20 @@ class Op(object): ...@@ -335,6 +389,20 @@ class Op(object):
profile_str=None, profile_str=None,
client_need_profile=False, client_need_profile=False,
profile_set=None): profile_set=None):
"""
Push data to output channels, Do not run the later stage(preprocess,
process, postprocess)
Args:
data: channeldata, to be pushed
channels: output channels
name: op name
profile_str: one profile message
client_need_profile: False default
profile_set: profile message collections
Returns:
None
"""
if name is None: if name is None:
name = self.name name = self.name
...@@ -384,52 +452,109 @@ class Op(object): ...@@ -384,52 +452,109 @@ class Op(object):
def init_op(self): def init_op(self):
pass pass
def _run_preprocess(self, parsed_data_dict, op_info_prefix): def _run_preprocess(self, parsed_data_dict, op_info_prefix, logid_dict):
"""
Run preprocess stage
Args:
parsed_data_dict: data to be pre-processed
op_info_prefix: input op info
logid_dict: logid dict
Returns:
preped_data_dict: data preprocessed, to be processed
err_channeldata_dict: when exceptions occurred, putting errors in it.
skip_process_dict: skip process stage or not
"""
_LOGGER.debug("{} Running preprocess".format(op_info_prefix)) _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = collections.OrderedDict() preped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict() err_channeldata_dict = collections.OrderedDict()
skip_process_dict = {}
for data_id, parsed_data in parsed_data_dict.items(): for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None preped_data, error_channeldata = None, None
is_skip_process = False
prod_errcode, prod_errinfo = None, None
log_id = logid_dict.get(data_id)
try: try:
preped_data = self.preprocess(parsed_data) preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
# Set skip_process_dict
if is_skip_process is True:
skip_process_dict[data_id] = True
except TypeError as e: except TypeError as e:
# Error type in channeldata.datatype # Error type in channeldata.datatype
error_info = "(logid={}) {} Failed to preprocess: {}".format( error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, op_info_prefix, e) data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.TYPE_ERROR.value, error_code=ChannelDataErrcode.TYPE_ERROR.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id,
log_id=log_id)
except Exception as e: except Exception as e:
error_info = "(logid={}) {} Failed to preprocess: {}".format( error_info = "(data_id={} log_id={}) {} Failed to preprocess: {}".format(
data_id, op_info_prefix, e) data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
if error_channeldata is not None: if error_channeldata is not None:
err_channeldata_dict[data_id] = error_channeldata err_channeldata_dict[data_id] = error_channeldata
else: else:
preped_data_dict[data_id] = preped_data preped_data_dict[data_id] = preped_data
_LOGGER.debug("{} Succ preprocess".format(op_info_prefix)) _LOGGER.debug("{} Succ preprocess".format(op_info_prefix))
return preped_data_dict, err_channeldata_dict return preped_data_dict, err_channeldata_dict, skip_process_dict
def _run_process(self, preped_data_dict, op_info_prefix): def _run_process(self, preped_data_dict, op_info_prefix, skip_process_dict,
logid_dict):
"""
Run process stage
Args:
preped_data_dict: feed the data to be predicted by the model.
op_info_prefix: prefix op info
skip_process_dict: skip process stage or not
logid_dict: logid dict
Returns:
midped_data_dict: data midprocessed, to be post-processed
err_channeldata_dict: when exceptions occurred, putting errors in it
"""
_LOGGER.debug("{} Running process".format(op_info_prefix)) _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = collections.OrderedDict() midped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict() err_channeldata_dict = collections.OrderedDict()
if self.with_serving: ### if (batch_num == 1 && skip == True) ,then skip the process stage.
is_skip_process = False
data_ids = preped_data_dict.keys() data_ids = preped_data_dict.keys()
if len(data_ids) == 1 and skip_process_dict.get(data_ids[0]) == True:
is_skip_process = True
_LOGGER.info("(data_id={} log_id={}) skip process stage".format(
data_ids[0], logid_dict.get(data_ids[0])))
if self.with_serving is True and is_skip_process is False:
# use typical_logid to mark batch data
typical_logid = data_ids[0] typical_logid = data_ids[0]
if len(data_ids) != 1: if len(data_ids) != 1:
for data_id in data_ids: for data_id in data_ids:
_LOGGER.info( _LOGGER.info(
"(logid={}) {} During access to PaddleServingService," "(data_id={} logid={}) {} During access to PaddleServingService,"
" we selected logid={} (from batch: {}) as a " " we selected logid={} (from batch: {}) as a "
"representative for logging.".format( "representative for logging.".format(
data_id, op_info_prefix, typical_logid, data_ids)) data_id,
logid_dict.get(data_id), op_info_prefix,
typical_logid, data_ids))
# combine samples to batch # combine samples to batch
one_input = preped_data_dict[data_ids[0]] one_input = preped_data_dict[data_ids[0]]
...@@ -449,64 +574,70 @@ class Op(object): ...@@ -449,64 +574,70 @@ class Op(object):
input_offset.append(offset) input_offset.append(offset)
else: else:
_LOGGER.critical( _LOGGER.critical(
"{} Failed to process: expect input type is dict(sample" "(data_id={} log_id={}){} Failed to process: expect input type is dict(sample"
" input) or list(batch input), but get {}".format( " input) or list(batch input), but get {}".format(data_ids[
op_info_prefix, type(one_input))) 0], typical_logid, op_info_prefix, type(one_input)))
os._exit(-1) os._exit(-1)
midped_batch = None midped_batch = None
ecode = ChannelDataEcode.OK.value error_code = ChannelDataErrcode.OK.value
if self._timeout <= 0: if self._timeout <= 0:
try: try:
midped_batch = self.process(feed_batch, typical_logid) midped_batch = self.process(feed_batch, typical_logid)
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value error_code = ChannelDataErrcode.UNKNOW.value
error_info = "(logid={}) {} Failed to process(batch: {}): {}".format( error_info = "(data_id={} log_id={}) {} Failed to process(batch: {}): {}".format(
typical_logid, op_info_prefix, data_ids, e) data_ids[0], typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
else: else:
# retry N times configed in yaml files.
for i in range(self._retry): for i in range(self._retry):
try: try:
# time out for each process
midped_batch = func_timeout.func_timeout( midped_batch = func_timeout.func_timeout(
self._timeout, self._timeout,
self.process, self.process,
args=(feed_batch, typical_logid)) args=(feed_batch, typical_logid))
except func_timeout.FunctionTimedOut as e: except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry: if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value error_code = ChannelDataErrcode.TIMEOUT.value
error_info = "(logid={}) {} Failed to process(batch: {}): " \ error_info = "(log_id={}) {} Failed to process(batch: {}): " \
"exceeded retry count.".format( "exceeded retry count.".format(
typical_logid, op_info_prefix, data_ids) typical_logid, op_info_prefix, data_ids)
_LOGGER.error(error_info) _LOGGER.error(error_info)
else: else:
_LOGGER.warning( _LOGGER.warning(
"(logid={}) {} Failed to process(batch: {}): timeout," "(log_id={}) {} Failed to process(batch: {}): timeout,"
" and retrying({}/{})...".format( " and retrying({}/{})...".format(
typical_logid, op_info_prefix, data_ids, i + typical_logid, op_info_prefix, data_ids, i +
1, self._retry)) 1, self._retry))
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value error_code = ChannelDataErrcode.UNKNOW.value
error_info = "(logid={}) {} Failed to process(batch: {}): {}".format( error_info = "(log_id={}) {} Failed to process(batch: {}): {}".format(
typical_logid, op_info_prefix, data_ids, e) typical_logid, op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
break break
else: else:
break break
if ecode != ChannelDataEcode.OK.value: if error_code != ChannelDataErrcode.OK.value:
for data_id in data_ids: for data_id in data_ids:
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id) error_code=error_code,
error_info=error_info,
data_id=data_id,
log_id=logid_dict.get(data_id))
elif midped_batch is None: elif midped_batch is None:
# op client return None # op client return None
error_info = "(logid={}) {} Failed to predict, please check if " \ error_info = "(log_id={}) {} Failed to predict, please check if " \
"PaddleServingService is working properly.".format( "PaddleServingService is working properly.".format(
typical_logid, op_info_prefix) typical_logid, op_info_prefix)
_LOGGER.error(error_info) _LOGGER.error(error_info)
for data_id in data_ids: for data_id in data_ids:
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value, error_code=ChannelDataErrcode.CLIENT_ERROR.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id,
log_id=logid_dict.get(data_id))
else: else:
# transform np format to dict format # transform np format to dict format
var_names = midped_batch.keys() var_names = midped_batch.keys()
...@@ -515,7 +646,7 @@ class Op(object): ...@@ -515,7 +646,7 @@ class Op(object):
for name in var_names: for name in var_names:
lod_offset_name = "{}.lod".format(name) lod_offset_name = "{}.lod".format(name)
if lod_offset_name in var_names: if lod_offset_name in var_names:
_LOGGER.debug("(logid={}) {} {} is LodTensor".format( _LOGGER.debug("(log_id={}) {} {} is LodTensor".format(
typical_logid, op_info_prefix, name)) typical_logid, op_info_prefix, name))
lod_var_names.add(name) lod_var_names.add(name)
lod_offset_names.add(lod_offset_name) lod_offset_names.add(lod_offset_name)
...@@ -551,38 +682,67 @@ class Op(object): ...@@ -551,38 +682,67 @@ class Op(object):
return midped_data_dict, err_channeldata_dict return midped_data_dict, err_channeldata_dict
def _run_postprocess(self, parsed_data_dict, midped_data_dict, def _run_postprocess(self, parsed_data_dict, midped_data_dict,
op_info_prefix): op_info_prefix, logid_dict):
"""
Run postprocess stage.
Args:
parsed_data_dict: data returned in preprocess stage
midped_data_dict: data returned in process stage
op_info_prefix: prefix op info
logid_dict: logid dict
Returns:
postped_data_dict: data postprocessed
err_channeldata_dict: when exceptions occurred, putting errors in it
"""
_LOGGER.debug("{} Running postprocess".format(op_info_prefix)) _LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = collections.OrderedDict() postped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict() err_channeldata_dict = collections.OrderedDict()
for data_id, midped_data in midped_data_dict.items(): for data_id, midped_data in midped_data_dict.items():
log_id = logid_dict.get(data_id)
postped_data, err_channeldata = None, None postped_data, err_channeldata = None, None
prod_errcode, prod_errinfo = None, None
try: try:
postped_data = self.postprocess(parsed_data_dict[data_id], postped_data, prod_errcode, prod_errinfo = self.postprocess(
midped_data) parsed_data_dict[data_id], midped_data,
logid_dict.get(data_id))
except Exception as e: except Exception as e:
error_info = "(logid={}) {} Failed to postprocess: {}".format( error_info = "(data_id={} log_id={}) {} Failed to postprocess: {}".format(
data_id, op_info_prefix, e) data_id, log_id, op_info_prefix, e)
_LOGGER.error(error_info, exc_info=True) _LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id,
log_id=log_id)
if prod_errcode is not None:
# product errors occured
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
if err_channeldata is not None: if err_channeldata is not None:
err_channeldata_dict[data_id] = err_channeldata err_channeldata_dict[data_id] = err_channeldata
continue continue
else: else:
if not isinstance(postped_data, dict): if not isinstance(postped_data, dict):
error_info = "(logid={}) {} Failed to postprocess: " \ error_info = "(log_id={} log_id={}) {} Failed to postprocess: " \
"output of postprocess funticon must be " \ "output of postprocess funticon must be " \
"dict type, but get {}".format( "dict type, but get {}".format(
data_id, op_info_prefix, data_id, log_id, op_info_prefix,
type(postped_data)) type(postped_data))
_LOGGER.error(error_info) _LOGGER.error(error_info)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, error_code=ChannelDataErrcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id,
log_id=log_id)
err_channeldata_dict[data_id] = err_channeldata err_channeldata_dict[data_id] = err_channeldata
continue continue
...@@ -592,12 +752,14 @@ class Op(object): ...@@ -592,12 +752,14 @@ class Op(object):
output_data = ChannelData( output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value, ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data, npdata=postped_data,
data_id=data_id) data_id=data_id,
log_id=log_id)
else: else:
output_data = ChannelData( output_data = ChannelData(
ChannelDataType.DICT.value, ChannelDataType.DICT.value,
dictdata=postped_data, dictdata=postped_data,
data_id=data_id) data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data postped_data_dict[data_id] = output_data
_LOGGER.debug("{} Succ postprocess".format(op_info_prefix)) _LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict return postped_data_dict, err_channeldata_dict
...@@ -633,24 +795,38 @@ class Op(object): ...@@ -633,24 +795,38 @@ class Op(object):
yield batch yield batch
def _parse_channeldata_batch(self, batch, output_channels): def _parse_channeldata_batch(self, batch, output_channels):
"""
Parse channeldatas batch
Args:
batch: auto-batching batch datas
output_channels: output channels
Returns:
parsed_data_dict: parsed from channeldata in batch
need_profile_dict: need profile dict in batch
profile_dict: profile info dict in batch
logid_dict: trace each request in batch
"""
parsed_data_dict = collections.OrderedDict() parsed_data_dict = collections.OrderedDict()
need_profile_dict = {} need_profile_dict = {}
profile_dict = {} profile_dict = {}
logid_dict = {}
for channeldata_dict in batch: for channeldata_dict in batch:
(data_id, error_channeldata, parsed_data, (data_id, error_channeldata, parsed_data,
client_need_profile, profile_set) = \ client_need_profile, profile_set, log_id) = \
self._parse_channeldata(channeldata_dict) self._parse_channeldata(channeldata_dict)
if error_channeldata is None: if error_channeldata is None:
parsed_data_dict[data_id] = parsed_data parsed_data_dict[data_id] = parsed_data
need_profile_dict[data_id] = client_need_profile need_profile_dict[data_id] = client_need_profile
profile_dict[data_id] = profile_set profile_dict[data_id] = profile_set
logid_dict[data_id] = log_id
else: else:
# error data in predecessor Op # error data in predecessor Op
# (error_channeldata with profile info) # (error_channeldata with profile info)
self._push_to_output_channels(error_channeldata, self._push_to_output_channels(error_channeldata,
output_channels) output_channels)
return parsed_data_dict, need_profile_dict, profile_dict return parsed_data_dict, need_profile_dict, profile_dict, logid_dict
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op, trace_buffer): is_thread_op, trace_buffer):
...@@ -664,7 +840,7 @@ class Op(object): ...@@ -664,7 +840,7 @@ class Op(object):
concurrency_idx) concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.critical( _LOGGER.critical(
"{} Failed to init op: {}".format(op_info_prefix, e), "{} failed to init op: {}".format(op_info_prefix, e),
exc_info=True) exc_info=True)
os._exit(-1) os._exit(-1)
_LOGGER.info("{} Succ init".format(op_info_prefix)) _LOGGER.info("{} Succ init".format(op_info_prefix))
...@@ -691,7 +867,7 @@ class Op(object): ...@@ -691,7 +867,7 @@ class Op(object):
# parse channeldata batch # parse channeldata batch
try: try:
parsed_data_dict, need_profile_dict, profile_dict \ parsed_data_dict, need_profile_dict, profile_dict, logid_dict\
= self._parse_channeldata_batch( = self._parse_channeldata_batch(
channeldata_dict_batch, output_channels) channeldata_dict_batch, output_channels)
except ChannelStopError: except ChannelStopError:
...@@ -704,11 +880,12 @@ class Op(object): ...@@ -704,11 +880,12 @@ class Op(object):
# preprecess # preprecess
start = profiler.record("prep#{}_0".format(op_info_prefix)) start = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict, skip_process_dict \
= self._run_preprocess(parsed_data_dict, op_info_prefix) = self._run_preprocess(parsed_data_dict, op_info_prefix, logid_dict)
end = profiler.record("prep#{}_1".format(op_info_prefix)) end = profiler.record("prep#{}_1".format(op_info_prefix))
prep_time = end - start prep_time = end - start
try: try:
# put error requests into output channel, skip process and postprocess stage
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
data=err_channeldata, data=err_channeldata,
...@@ -725,7 +902,7 @@ class Op(object): ...@@ -725,7 +902,7 @@ class Op(object):
# process # process
start = profiler.record("midp#{}_0".format(op_info_prefix)) start = profiler.record("midp#{}_0".format(op_info_prefix))
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix) = self._run_process(preped_data_dict, op_info_prefix, skip_process_dict, logid_dict)
end = profiler.record("midp#{}_1".format(op_info_prefix)) end = profiler.record("midp#{}_1".format(op_info_prefix))
midp_time = end - start midp_time = end - start
try: try:
...@@ -745,8 +922,7 @@ class Op(object): ...@@ -745,8 +922,7 @@ class Op(object):
# postprocess # postprocess
start = profiler.record("postp#{}_0".format(op_info_prefix)) start = profiler.record("postp#{}_0".format(op_info_prefix))
postped_data_dict, err_channeldata_dict \ postped_data_dict, err_channeldata_dict \
= self._run_postprocess( = self._run_postprocess(parsed_data_dict, midped_data_dict, op_info_prefix, logid_dict)
parsed_data_dict, midped_data_dict, op_info_prefix)
end = profiler.record("postp#{}_1".format(op_info_prefix)) end = profiler.record("postp#{}_1".format(op_info_prefix))
postp_time = end - start postp_time = end - start
try: try:
...@@ -856,17 +1032,37 @@ class RequestOp(Op): ...@@ -856,17 +1032,37 @@ class RequestOp(Op):
os._exit(-1) os._exit(-1)
def unpack_request_package(self, request): def unpack_request_package(self, request):
dictdata = {} """
for idx, key in enumerate(request.key): Unpack request package by gateway.proto
data = request.value[idx] Args:
try: request: HTTP body, JSON format
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray): Returns:
data = evaled_data dict_data: json fields in HTTP body
except Exception as e: log_id: log_id
pass prod_errcode: None or ProductErrCode.SUCC.value default, otherwise,
dictdata[key] = data product errores occured.It is handled in the same way
return dictdata as exception.
prod_errinfo: "" default
"""
dict_data = {}
log_id = None
if request is None:
_LOGGER.critical("request is None")
raise ValueError("request is None")
_LOGGER.info("unpack_request_package reqeust:{}".format(request))
dict_data["name"] = request.name
dict_data["method"] = request.method
dict_data["appid"] = request.appid
dict_data["format"] = request.format
dict_data["from"] = getattr(request, "from")
dict_data["cmdid"] = request.cmdid
dict_data["clientip"] = request.clientip
dict_data["data"] = request.data
log_id = request.logid
req_data = proto_data.SerializeToString()
return dict_data, log_id, None, ""
class ResponseOp(Op): class ResponseOp(Op):
...@@ -884,40 +1080,62 @@ class ResponseOp(Op): ...@@ -884,40 +1080,62 @@ class ResponseOp(Op):
os._exit(-1) os._exit(-1)
def pack_response_package(self, channeldata): def pack_response_package(self, channeldata):
"""
Getting channeldata from the last channel, pack custom results by json
format and serialize by protobuf.
"""
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.ecode = channeldata.ecode keys = []
if resp.ecode == ChannelDataEcode.OK.value: values = []
error_code = channeldata.error_code
error_info = ""
if error_code == ChannelDataErrcode.OK.value:
if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value: if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value:
feed = channeldata.parse() feed = channeldata.parse()
# ndarray to string: # ndarray to string:
# https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray # https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray
np.set_printoptions(threshold=sys.maxsize) np.set_printoptions(threshold=sys.maxsize)
for name, var in feed.items(): for name, var in feed.items():
resp.value.append(var.__repr__()) values.append(var.__repr__())
resp.key.append(name) keys.append(name)
elif channeldata.datatype == ChannelDataType.DICT.value: elif channeldata.datatype == ChannelDataType.DICT.value:
feed = channeldata.parse() feed = channeldata.parse()
for name, var in feed.items(): for name, var in feed.items():
if not isinstance(var, str): if not isinstance(var, str):
resp.ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
resp.error_info = self._log( error_info = self._log(
"fetch var type must be str({}).".format( "fetch var type must be str({}).".format(
type(var))) type(var)))
_LOGGER.error("(logid={}) Failed to pack RPC " _LOGGER.error("(logid={}) Failed to pack RPC "
"response package: {}".format( "response package: {}".format(
channeldata.id, resp.error_info)) channeldata.id, resp.error_info))
break break
resp.value.append(var) values.append(var)
resp.key.append(name) keys.append(name)
else: else:
resp.ecode = ChannelDataEcode.TYPE_ERROR.value error_code = ChannelDataErrcode.TYPE_ERROR.value
resp.error_info = self._log( error_info = self._log("error type({}) in datatype.".format(
"error type({}) in datatype.".format(channeldata.datatype)) channeldata.datatype))
_LOGGER.error("(logid={}) Failed to pack RPC response" _LOGGER.error("(logid={}) Failed to pack RPC response"
" package: {}".format(channeldata.id, " package: {}".format(channeldata.id, error_info))
resp.error_info))
else: else:
resp.error_info = channeldata.error_info error_info = channeldata.error_info
if error_code == ChannelDataErrcode.PRODUCT_ERROR.value:
#rewrite error_code when product errors occured
error_code = channeldata.prod_error_code
error_info = channeldata.prod_error_info
# pack results
result = {}
result["keys"] = keys
result["values"] = values
if error_code is None:
error_code = 0
#1.json encode
resp.err_no = error_code
resp.err_msg = error_info
resp.result = base64.b64encode(json.dumps(result))
return resp return resp
......
...@@ -18,7 +18,8 @@ import numpy as np ...@@ -18,7 +18,8 @@ import numpy as np
from numpy import * from numpy import *
import logging import logging
import functools import functools
from .channel import ChannelDataEcode import json
from .channel import ChannelDataErrcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
...@@ -42,47 +43,33 @@ class PipelineClient(object): ...@@ -42,47 +43,33 @@ class PipelineClient(object):
def _pack_request_package(self, feed_dict, profile): def _pack_request_package(self, feed_dict, profile):
req = pipeline_service_pb2.Request() req = pipeline_service_pb2.Request()
"""
np.set_printoptions(threshold=sys.maxsize) np.set_printoptions(threshold=sys.maxsize)
new_dict = {}
for key, value in feed_dict.items(): for key, value in feed_dict.items():
req.key.append(key)
if isinstance(value, np.ndarray): if isinstance(value, np.ndarray):
req.value.append(value.__repr__()) new_dict[key] = value.__repr__()
elif isinstance(value, (str, unicode)): elif isinstance(value, (str, unicode)):
req.value.append(value) new_dict[key] = value
elif isinstance(value, list): elif isinstance(value, list):
req.value.append(np.array(value).__repr__()) new_dict[key] = np.array(value).__repr__()
else: else:
raise TypeError("only str and np.ndarray type is supported: {}". raise TypeError("only str and np.ndarray type is supported: {}".
format(type(value))) format(type(value)))
if profile: if profile:
req.key.append(self._profile_key) new_dict[self._profile_key] = self._profile_value
req.value.append(self._profile_value) """
req.appid = feed_dict.get("appid")
req.logid = feed_dict.get("logid")
req.format = feed_dict.get("format")
setattr(req, "from", feed_dict.get("from"))
req.cmdid = feed_dict.get("cmdid")
req.clientip = feed_dict.get("clientip")
req.data = feed_dict.get("data")
return req return req
def _unpack_response_package(self, resp, fetch): def _unpack_response_package(self, resp, fetch):
if resp.ecode != 0: return resp
return {
"ecode": resp.ecode,
"ecode_desc": ChannelDataEcode(resp.ecode),
"error_info": resp.error_info,
}
fetch_map = {"ecode": resp.ecode}
for idx, key in enumerate(resp.key):
if key == self._profile_key:
if resp.value[idx] != "":
sys.stderr.write(resp.value[idx])
continue
if fetch is not None and key not in fetch:
continue
data = resp.value[idx]
try:
evaled_data = eval(data)
if isinstance(evaled_data, np.ndarray):
data = evaled_data
except Exception as e:
pass
fetch_map[key] = data
return fetch_map
def predict(self, feed_dict, fetch=None, asyn=False, profile=False): def predict(self, feed_dict, fetch=None, asyn=False, profile=False):
if not isinstance(feed_dict, dict): if not isinstance(feed_dict, dict):
......
...@@ -42,10 +42,13 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -42,10 +42,13 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
def inference(self, request, context): def inference(self, request, context):
_LOGGER.info("inference request name:{} self.name:{}".format(
request.name, self._name))
if request.name != "" and request.name != self._name: if request.name != "" and request.name != self._name:
resp = pipeline_service_pb2.Response() resp = pipeline_service_pb2.Response()
resp.ecode = channel.ChannelDataEcode.NO_SERVICE.value resp.err_no = channel.ChannelDataErrcode.NO_SERVICE.value
resp.error_info = "Failed to inference: Service name error." resp.err_msg = "Failed to inference: Service name error."
resp.result = ""
return resp return resp
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
return resp return resp
...@@ -192,7 +195,6 @@ class PipelineServer(object): ...@@ -192,7 +195,6 @@ class PipelineServer(object):
bind_address = 'localhost:{}'.format(port) bind_address = 'localhost:{}'.format(port)
workers = [] workers = []
for i in range(self._worker_num): for i in range(self._worker_num):
show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, self._conf, i)) args=(bind_address, self._response_op, self._conf, i))
......
...@@ -16,16 +16,21 @@ syntax = "proto2"; ...@@ -16,16 +16,21 @@ syntax = "proto2";
package baidu.paddle_serving.pipeline_serving; package baidu.paddle_serving.pipeline_serving;
message Request { message Request {
repeated string key = 1; optional string name = 1;
repeated string value = 2; optional string methond = 2;
optional string name = 3; optional string appid = 3;
optional int64 logid = 4;
optional string format = 5;
optional string from = 6;
optional string cmdid = 7;
optional string clientip = 8;
optional string data = 9;
}; };
message Response { message Response {
repeated string key = 1; optional int32 err_no = 1;
repeated string value = 2; optional string err_msg = 2;
required int32 ecode = 3; optional string result = 3;
optional string error_info = 4;
}; };
service PipelineService { service PipelineService {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册