From a70aa65b86faf1e3ff30834efb97da5ce80232e5 Mon Sep 17 00:00:00 2001 From: barriery Date: Mon, 3 Aug 2020 10:52:05 +0000 Subject: [PATCH] update log --- python/pipeline/__init__.py | 6 +- python/pipeline/analyse.py | 2 +- python/pipeline/channel.py | 186 ++++++++++++++----------- python/pipeline/dag.py | 119 +++++++++------- python/pipeline/operator.py | 209 ++++++++++++++++------------- python/pipeline/pipeline_server.py | 9 +- python/pipeline/profiler.py | 6 +- 7 files changed, 307 insertions(+), 230 deletions(-) diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index f720e4d2..913ee39f 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -11,7 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logger # this module must be the first to import +from operator import Op, RequestOp, ResponseOp +from pipeline_server import PipelineServer +from pipeline_client import PipelineClient +from analyse import Analyst from operator import Op, RequestOp, ResponseOp from pipeline_server import PipelineServer from pipeline_client import PipelineClient diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 65194889..b37484a3 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -164,7 +164,7 @@ class OpAnalyst(object): def add(self, name_str, ts_list): if self._close: - _LOGGER.error("OpAnalyst is closed.") + _LOGGER.error("Failed to add item: OpAnalyst is closed.") return op_name, curr_idx, step = self._parse(name_str) if op_name not in self.op_time_list_dict: diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index a85d28dd..67deb530 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -70,7 +70,8 @@ class ChannelData(object): ''' if ecode is not None: if data_id is None or error_info is None: - _LOGGER.critical("data_id and error_info cannot be None") + _LOGGER.critical("Failed to generate ChannelData: data_id" + " and error_info cannot be None") os._exit(-1) datatype = ChannelDataType.ERROR.value else: @@ -78,14 +79,15 @@ class ChannelData(object): ecode, error_info = ChannelData.check_npdata(npdata) if ecode != ChannelDataEcode.OK.value: datatype = ChannelDataType.ERROR.value - _LOGGER.error(error_info) + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) elif datatype == ChannelDataType.DICT.value: ecode, error_info = ChannelData.check_dictdata(dictdata) if ecode != ChannelDataEcode.OK.value: datatype = ChannelDataType.ERROR.value - _LOGGER.error(error_info) + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) else: - _LOGGER.critical("datatype not match") + _LOGGER.critical("(logid={}) datatype not match".format( + data_id)) os._exit(-1) self.datatype = datatype self.npdata = npdata @@ -110,14 +112,14 @@ class ChannelData(object): for sample in dictdata: if not isinstance(sample, dict): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(sample)) + error_info = "Failed to check data: the type of " \ + "data must be dict, but get {}.".format(type(sample)) break elif not isinstance(dictdata, dict): # batch size = 1 ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(dictdata)) + error_info = "Failed to check data: the type of data must " \ + "be dict, but get {}.".format(type(dictdata)) return ecode, error_info @staticmethod @@ -139,27 +141,30 @@ class ChannelData(object): for sample in npdata: if not isinstance(sample, dict): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(sample)) + error_info = "Failed to check data: the " \ + "value of data must be dict, but get {}.".format( + type(sample)) break for _, value in sample.items(): if not isinstance(value, np.ndarray): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be np.ndarray, but get {}.".format(type(value)) + error_info = "Failed to check data: the" \ + " value of data must be np.ndarray, but get {}.".format( + type(value)) return ecode, error_info elif isinstance(npdata, dict): # batch_size = 1 for _, value in npdata.items(): if not isinstance(value, np.ndarray): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be np.ndarray, but get {}.".format(type(value)) + error_info = "Failed to check data: the value " \ + "of data must be np.ndarray, but get {}.".format( + type(value)) break else: ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(npdata)) + error_info = "Failed to check data: the value of data " \ + "must be dict, but get {}.".format(type(npdata)) return ecode, error_info def parse(self): @@ -171,8 +176,8 @@ class ChannelData(object): # return dict feed = self.dictdata else: - _LOGGER.critical("Error type({}) in datatype.".format( - self.datatype)) + _LOGGER.critical("Failed to parse channeldata: error " \ + "type({}) in datatype.".format(self.datatype)) os._exit(-1) return feed @@ -247,33 +252,36 @@ class ProcessChannel(object): """ not thread safe, and can only be called during initialization. """ if op_name in self._producers: _LOGGER.critical( - self._log("producer({}) is already in channel".format(op_name))) + self._log("Failed to add producer: producer({})" \ + " is already in channel".format(op_name))) os._exit(-1) self._producers.append(op_name) - _LOGGER.debug(self._log("add a producer: {}".format(op_name))) + _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name))) def add_consumer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._consumer_cursors: _LOGGER.critical( - self._log("consumer({}) is already in channel".format(op_name))) + self._log("Failed to add consumer: consumer({})" \ + " is already in channel".format(op_name))) os._exit(-1) self._consumer_cursors[op_name] = 0 if self._cursor_count.get(0) is None: self._cursor_count[0] = 0 self._cursor_count[0] += 1 - _LOGGER.debug(self._log("add a consumer: {}".format(op_name))) + _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name))) def push(self, channeldata, op_name=None): _LOGGER.debug( - self._log("{} try to push data[{}]".format(op_name, - channeldata.id))) + self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, + op_name))) if len(self._producers) == 0: _LOGGER.critical( self._log( - "expected number of producers to be greater than 0, but the it is 0." - )) + "(logid={}) Op({}) Failed to push data: expected number" + " of producers to be greater than 0, but the it is 0.". + format(channeldata.id, op_name))) os._exit(-1) elif len(self._producers) == 1: with self._cv: @@ -287,13 +295,15 @@ class ProcessChannel(object): raise ChannelStopError() self._cv.notify_all() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.".format( - op_name, channeldata.id))) + self._log("(logid={}) Op({}) Pushed data into internal queue.". + format(channeldata.id, op_name))) return True elif op_name is None: _LOGGER.critical( self._log( - "There are multiple producers, so op_name cannot be None.")) + "(logid={}) Op({}) Failed to push data: there are multiple " + "producers, so op_name cannot be None.".format( + channeldata.id, op_name))) os._exit(-1) producer_num = len(self._producers) @@ -321,8 +331,9 @@ class ProcessChannel(object): if put_data is None: _LOGGER.debug( - self._log("{} succ push data[{}] into input_buffer.".format( - op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into input_buffer.". + format(data_id, op_name))) else: while self._stop.value == 0: try: @@ -334,15 +345,16 @@ class ProcessChannel(object): raise ChannelStopError() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.". - format(op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into internal_queue.". + format(data_id, op_name))) self._cv.notify_all() return True def front(self, op_name=None, timeout=None): _LOGGER.debug( - self._log("{} try to get data[?]; timeout(s)={}".format(op_name, - timeout))) + self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name, + timeout))) endtime = None if timeout is not None: if timeout <= 0: @@ -353,8 +365,8 @@ class ProcessChannel(object): if len(self._consumer_cursors) == 0: _LOGGER.critical( self._log( - "expected number of consumers to be greater than 0, but the it is 0." - )) + "Op({}) Failed to get data: expected number of consumers to be " \ + "greater than 0, but the it is 0.".format(op_name))) os._exit(-1) elif len(self._consumer_cursors) == 1: resp = None @@ -368,8 +380,8 @@ class ProcessChannel(object): remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: " + "timeout".format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -377,13 +389,14 @@ class ProcessChannel(object): if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( - self._log("{} succ get data[{}]".format(op_name, - resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data".format(resp.values()[0] + .id, op_name))) return resp elif op_name is None: _LOGGER.critical( self._log( - "There are multiple consumers, so op_name cannot be None.")) + "Op({}) Failed to get data: there are multiple consumers, " + "so op_name cannot be None.".format(op_name))) os._exit(-1) # In output_buf, different Ops (according to op_name) have different @@ -405,16 +418,17 @@ class ProcessChannel(object): channeldata = self._que.get(timeout=0) self._output_buf.append(channeldata) _LOGGER.debug( - self._log("pop ready item[{}] into output_buffer". - format(channeldata.values()[0].id))) + self._log( + "(logid={}) Op({}) Pop ready item into output_buffer". + format(channeldata.values()[0].id, op_name))) break except Queue.Empty: if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -437,7 +451,7 @@ class ProcessChannel(object): self._base_cursor.value += 1 # to avoid cursor overflow if self._base_cursor.value >= self._reset_max_cursor: - _LOGGER.info(self._log("reset cursor in Channel")) + _LOGGER.info(self._log("Reset cursor in Channel")) self._base_cursor.value -= self._reset_max_cursor for name in self._consumer_cursors.keys(): self._consumer_cursors[name] -= self._reset_max_cursor @@ -458,8 +472,8 @@ class ProcessChannel(object): self._cv.notify_all() _LOGGER.debug( - self._log("{} succ get data[{}] from output_buffer".format( - op_name, resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data from output_buffer".format( + resp.values()[0].id, op_name))) return resp def stop(self): @@ -529,33 +543,36 @@ class ThreadChannel(Queue.Queue): """ not thread safe, and can only be called during initialization. """ if op_name in self._producers: _LOGGER.critical( - self._log("producer({}) is already in channel".format(op_name))) + self._log("Failed to add producer: producer({}) is " + "already in channel".format(op_name))) os._exit(-1) self._producers.append(op_name) - _LOGGER.debug(self._log("add a producer: {}".format(op_name))) + _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name))) def add_consumer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._consumer_cursors: _LOGGER.critical( - self._log("consumer({}) is already in channel".format(op_name))) + self._log("Failed to add consumer: consumer({}) is " + "already in channel".format(op_name))) os._exit(-1) self._consumer_cursors[op_name] = 0 if self._cursor_count.get(0) is None: self._cursor_count[0] = 0 self._cursor_count[0] += 1 - _LOGGER.debug(self._log("add a consumer: {}".format(op_name))) + _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name))) def push(self, channeldata, op_name=None): _LOGGER.debug( - self._log("{} try to push data[{}]".format(op_name, - channeldata.id))) + self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, + op_name))) if len(self._producers) == 0: _LOGGER.critical( self._log( - "expected number of producers to be greater than 0, but the it is 0." - )) + "(logid={}) Op({}) Failed to push data: expected number of " + "producers to be greater than 0, but the it is 0.".format( + channeldata.id, op_name))) os._exit(-1) elif len(self._producers) == 1: with self._cv: @@ -569,13 +586,15 @@ class ThreadChannel(Queue.Queue): raise ChannelStopError() self._cv.notify_all() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.".format( - op_name, channeldata.id))) + self._log("(logid={}) Op({}) Pushed data into internal_queue.". + format(channeldata.id, op_name))) return True elif op_name is None: _LOGGER.critical( self._log( - "There are multiple producers, so op_name cannot be None.")) + "(logid={}) Op({}) Failed to push data: there are multiple" + " producers, so op_name cannot be None.".format( + channeldata.id, op_name))) os._exit(-1) producer_num = len(self._producers) @@ -598,8 +617,9 @@ class ThreadChannel(Queue.Queue): if put_data is None: _LOGGER.debug( - self._log("{} succ push data[{}] into input_buffer.".format( - op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into input_buffer.". + format(data_id, op_name))) else: while self._stop is False: try: @@ -611,15 +631,16 @@ class ThreadChannel(Queue.Queue): raise ChannelStopError() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.". - format(op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into internal_queue.". + format(data_id, op_name))) self._cv.notify_all() return True def front(self, op_name=None, timeout=None): _LOGGER.debug( - self._log("{} try to get data[?]; timeout(s)={}".format(op_name, - timeout))) + self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name, + timeout))) endtime = None if timeout is not None: if timeout <= 0: @@ -630,8 +651,8 @@ class ThreadChannel(Queue.Queue): if len(self._consumer_cursors) == 0: _LOGGER.critical( self._log( - "expected number of consumers to be greater than 0, but the it is 0." - )) + "Op({}) Failed to get data: expected number of consumers to be " + "greater than 0, but the it is 0.".format(op_name))) os._exit(-1) elif len(self._consumer_cursors) == 1: resp = None @@ -645,8 +666,9 @@ class ThreadChannel(Queue.Queue): remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log( + "Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -654,13 +676,14 @@ class ThreadChannel(Queue.Queue): if self._stop: raise ChannelStopError() _LOGGER.debug( - self._log("{} succ get data[{}]".format(op_name, - resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data".format(resp.values()[0] + .id, op_name))) return resp elif op_name is None: _LOGGER.critical( - self._log( - "There are multiple consumers, so op_name cannot be None.")) + self._log("Op({}) Failed to get data: there are multiple " + "consumers, so op_name cannot be None.".format( + op_name))) os._exit(-1) # In output_buf, different Ops (according to op_name) have different @@ -682,16 +705,17 @@ class ThreadChannel(Queue.Queue): channeldata = self.get(timeout=0) self._output_buf.append(channeldata) _LOGGER.debug( - self._log("pop ready item[{}] into output_buffer". - format(channeldata.values()[0].id))) + self._log( + "(logid={}) Op({}) Pop ready item into output_buffer". + format(channeldata.values()[0].id, op_name))) break except Queue.Empty: if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -715,7 +739,7 @@ class ThreadChannel(Queue.Queue): self._base_cursor += 1 # to avoid cursor overflow if self._base_cursor >= self._reset_max_cursor: - _LOGGER.info(self._log("reset cursor in Channel")) + _LOGGER.info(self._log("Reset cursor in Channel")) self._base_cursor -= self._reset_max_cursor for name in self._consumer_cursors: self._consumer_cursors[name] -= self._reset_max_cursor @@ -735,8 +759,8 @@ class ThreadChannel(Queue.Queue): self._cv.notify_all() _LOGGER.debug( - self._log("{} succ get data[{}] from output_buffer".format( - op_name, resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data from output_buffer".format( + resp.values()[0].id, op_name))) return resp def stop(self): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 73b39a19..87aa7dfe 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType, ChannelStopError) from .profiler import TimeProfiler from .util import NameGenerator +from .proto import pipeline_service_pb2 _LOGGER = logging.getLogger() @@ -74,17 +75,18 @@ class DAGExecutor(object): self._recive_func = threading.Thread( target=DAGExecutor._recive_out_channel_func, args=(self, )) self._recive_func.start() - _LOGGER.debug("[DAG Executor] start recive thread") + _LOGGER.debug("[DAG Executor] Start recive thread") def stop(self): self._dag.stop() self._dag.join() - _LOGGER.info("[DAG Executor] succ stop") + _LOGGER.info("[DAG Executor] Stop") def _get_next_data_id(self): data_id = None with self._id_lock: if self._id_counter >= self._reset_max_id: + _LOGGER.info("[DAG Executor] Reset request id") self._id_counter -= self._reset_max_id data_id = self._id_counter self._id_counter += 1 @@ -96,16 +98,18 @@ class DAGExecutor(object): def _set_in_channel(self, in_channel): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): - _LOGGER.critical("[DAG Executor] in_channel must be Channel" - " type, but get {}".format(type(in_channel))) + _LOGGER.critical("[DAG Executor] Failed to set in_channel: " + "in_channel must be Channel type, but get {}". + format(type(in_channel))) os._exit(-1) in_channel.add_producer(self.name) self._in_channel = in_channel def _set_out_channel(self, out_channel): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): - _LOGGER.critical("[DAG Executor]iout_channel must be Channel" - " type, but get {}".format(type(out_channel))) + _LOGGER.critical("[DAG Executor] Failed to set out_channel: " + "must be Channel type, but get {}".format( + type(out_channel))) os._exit(-1) out_channel.add_consumer(self.name) self._out_channel = out_channel @@ -116,7 +120,7 @@ class DAGExecutor(object): try: channeldata_dict = self._out_channel.front(self.name) except ChannelStopError: - _LOGGER.info("[DAG Executor] channel stop.") + _LOGGER.info("[DAG Executor] Stop.") with self._cv_for_cv_pool: for data_id, cv in self._cv_pool.items(): closed_errror_data = ChannelData( @@ -130,17 +134,20 @@ class DAGExecutor(object): if len(channeldata_dict) != 1: _LOGGER.critical( - "[DAG Executor] out_channel cannot have multiple input ops") + "[DAG Executor] Failed to fetch result: out_channel " + "cannot have multiple input ops") os._exit(-1) (_, channeldata), = channeldata_dict.items() if not isinstance(channeldata, ChannelData): _LOGGER.critical( - '[DAG Executor] data must be ChannelData type, but get {}' + '[DAG Executor] Failed to fetch result: data in out_channel" \ + " must be ChannelData type, but get {}' .format(type(channeldata))) os._exit(-1) data_id = channeldata.id - _LOGGER.debug("recive thread fetch data[{}]".format(data_id)) + _LOGGER.debug("(logid={}) [recive thread] Fetched data".format( + data_id)) with self._cv_for_cv_pool: cond_v = self._cv_pool[data_id] with cond_v: @@ -164,7 +171,7 @@ class DAGExecutor(object): ready_data = self._fetch_buffer[data_id] self._cv_pool.pop(data_id) self._fetch_buffer.pop(data_id) - _LOGGER.debug("resp thread get resp data[{}]".format(data_id)) + _LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id)) return ready_data def _pack_channeldata(self, rpc_request, data_id): @@ -172,8 +179,10 @@ class DAGExecutor(object): try: dictdata = self._unpack_rpc_func(rpc_request) except Exception as e: - _LOGGER.error("parse RPC package to data[{}] Error: {}" - .format(data_id, e)) + _LOGGER.error( + "(logid={}) Failed to parse RPC request package: {}" + .format(data_id, e), + exc_info=True) return ChannelData( ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, error_info="rpc package error: {}".format(e), @@ -187,7 +196,7 @@ class DAGExecutor(object): profile_value = rpc_request.value[idx] break client_need_profile = (profile_value == self._client_profile_value) - _LOGGER.debug("request[{}] need profile: {}".format( + _LOGGER.debug("(logid={}) Need profile in client: {}".format( data_id, client_need_profile)) return ChannelData( datatype=ChannelDataType.DICT.value, @@ -197,26 +206,28 @@ class DAGExecutor(object): def call(self, rpc_request): data_id, cond_v = self._get_next_data_id() - _LOGGER.debug("generate Request id: {}".format(data_id)) + _LOGGER.info("(logid={}) Succ generate id".format(data_id)) + start_call, end_call = None, None if not self._is_thread_op: - self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id)) + start_call = self._profiler.record("call_{}#DAG-{}_0".format( + data_id, data_id)) else: - self._profiler.record("call_{}#DAG_0".format(data_id)) + start_call = self._profiler.record("call_{}#DAG_0".format(data_id)) - _LOGGER.debug("try parse RPC request to channeldata[{}]".format( - data_id)) + _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) req_channeldata = self._pack_channeldata(rpc_request, data_id) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) resp_channeldata = None for i in range(self._retry): - _LOGGER.debug("push data[{}] into Graph engine".format(data_id)) + _LOGGER.debug("(logid={}) Pushing data into Graph engine".format( + data_id)) try: self._in_channel.push(req_channeldata, self.name) except ChannelStopError: - _LOGGER.debug("[DAG Executor] channel stop.") + _LOGGER.debug("[DAG Executor] Stop") with self._cv_for_cv_pool: self._cv_pool.pop(data_id) return self._pack_for_rpc_resp( @@ -225,32 +236,35 @@ class DAGExecutor(object): error_info="dag closed.", data_id=data_id)) - _LOGGER.debug("wait Graph engine for data[{}]...".format(data_id)) + _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id)) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, cond_v) if resp_channeldata.ecode == ChannelDataEcode.OK.value: - _LOGGER.debug("request[{}] succ predict".format(data_id)) + _LOGGER.debug("(logid={}) Succ predict".format(data_id)) break else: - _LOGGER.warning("request[{}] predict failed: {}" - .format(data_id, resp_channeldata.error_info)) + _LOGGER.error("(logid={}) Failed to predict: {}" + .format(data_id, resp_channeldata.error_info)) if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: break if i + 1 < self._retry: - _LOGGER.warning("retry({}/{}) data[{}]".format( - i + 1, self._retry, data_id)) + _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format( + data_id, i + 1, self._retry)) - _LOGGER.debug("unpack channeldata[{}] into RPC response".format( - data_id)) + _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) rpc_resp = self._pack_for_rpc_resp(resp_channeldata) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) if not self._is_thread_op: - self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id)) + end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id, + data_id)) else: - self._profiler.record("call_{}#DAG_1".format(data_id)) + end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) + _LOGGER.log(level=1, + msg="(logid={}) call[{} ms]".format( + data_id, (end_call - start_call) / 1e3)) profile_str = self._profiler.gen_profile_str() if self._server_use_profile: @@ -268,7 +282,17 @@ class DAGExecutor(object): return rpc_resp def _pack_for_rpc_resp(self, channeldata): - return self._pack_rpc_func(channeldata) + try: + return self._pack_rpc_func(channeldata) + except Exception as e: + _LOGGER.error( + "(logid={}) Failed to pack RPC response package: {}" + .format(channeldata.id, e), + exc_info=True) + resp = pipeline_service_pb2.Response() + resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value + resp.error_info = "rpc package error: {}".format(e) + return resp class DAG(object): @@ -283,7 +307,7 @@ class DAG(object): self._build_dag_each_worker = build_dag_each_worker if not self._is_thread_op: self._manager = multiprocessing.Manager() - _LOGGER.info("[DAG] succ init") + _LOGGER.info("[DAG] Succ init") def get_use_ops(self, response_op): unique_names = set() @@ -303,7 +327,8 @@ class DAG(object): used_ops.add(pred_op) # check the name of op is globally unique if pred_op.name in unique_names: - _LOGGER.critical("the name of Op must be unique: {}". + _LOGGER.critical("Failed to get used Ops: the" + " name of Op must be unique: {}". format(pred_op.name)) os._exit(-1) unique_names.add(pred_op.name) @@ -317,12 +342,12 @@ class DAG(object): else: channel = ProcessChannel( self._manager, name=name_gen.next(), maxsize=self._channel_size) - _LOGGER.debug("[DAG] gen Channel: {}".format(channel.name)) + _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name)) return channel def _gen_virtual_op(self, name_gen): vir_op = VirtualOp(name=name_gen.next()) - _LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name)) + _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name)) return vir_op def _topo_sort(self, used_ops, response_op, out_degree_ops): @@ -337,7 +362,8 @@ class DAG(object): if len(op.get_input_ops()) == 0: zero_indegree_num += 1 if zero_indegree_num != 1: - _LOGGER.critical("DAG contains multiple RequestOps") + _LOGGER.critical("Failed to topo sort: DAG contains " + "multiple RequestOps") os._exit(-1) last_op = response_op.get_input_ops()[0] ques[que_idx].put(last_op) @@ -362,14 +388,15 @@ class DAG(object): break que_idx = (que_idx + 1) % 2 if sorted_op_num < len(used_ops): - _LOGGER.critical("not legal DAG") + _LOGGER.critical("Failed to topo sort: not legal DAG") os._exit(-1) return dag_views, last_op def _build_dag(self, response_op): if response_op is None: - _LOGGER.critical("ResponseOp has not been set.") + _LOGGER.critical("Failed to build DAG: ResponseOp" + " has not been set.") os._exit(-1) used_ops, out_degree_ops = self.get_use_ops(response_op) if not self._build_dag_each_worker: @@ -380,8 +407,8 @@ class DAG(object): _LOGGER.info("-------------------------------------------") if len(used_ops) <= 1: _LOGGER.critical( - "Besides RequestOp and ResponseOp, there should be at least one Op in DAG." - ) + "Failed to build DAG: besides RequestOp and ResponseOp, " + "there should be at least one Op in DAG.") os._exit(-1) if self._build_dag_each_worker: _LOGGER.info("Because `build_dag_each_worker` mode is used, " @@ -443,8 +470,6 @@ class DAG(object): continue channel = self._gen_channel(channel_name_gen) channels.append(channel) - _LOGGER.debug("[DAG] Channel({}) => Op({})" - .format(channel.name, op.name)) op.add_input_channel(channel) pred_ops = pred_op_of_next_view_op[op.name] if v_idx == 0: @@ -452,8 +477,6 @@ class DAG(object): else: # if pred_op is virtual op, it will use ancestors as producers to channel for pred_op in pred_ops: - _LOGGER.debug("[DAG] Op({}) => Channel({})" - .format(pred_op.name, channel.name)) pred_op.add_output_channel(channel) processed_op.add(op.name) # find same input op to combine channel @@ -469,8 +492,6 @@ class DAG(object): same_flag = False break if same_flag: - _LOGGER.debug("[DAG] Channel({}) => Op({})" - .format(channel.name, other_op.name)) other_op.add_input_channel(channel) processed_op.add(other_op.name) output_channel = self._gen_channel(channel_name_gen) @@ -488,7 +509,7 @@ class DAG(object): actual_ops.append(op) for c in channels: - _LOGGER.debug("Channel({}):\n\t-producers: {}\n\t-consumers: {}" + _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}" .format(c.name, c.get_producers(), c.get_consumers())) return (actual_ops, channels, input_channel, output_channel, pack_func, @@ -497,7 +518,7 @@ class DAG(object): def build(self): (actual_ops, channels, input_channel, output_channel, pack_func, unpack_func) = self._build_dag(self._response_op) - _LOGGER.info("[DAG] succ build dag") + _LOGGER.info("[DAG] Succ build DAG") self._actual_ops = actual_ops self._channels = channels diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 0e9c77e2..287893f9 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -73,8 +73,9 @@ class Op(object): if self._auto_batching_timeout is not None: if self._auto_batching_timeout <= 0 or self._batch_size == 1: _LOGGER.warning( - "Because auto_batching_timeout <= 0 or batch_size == 1," - " set auto_batching_timeout to None.") + self._log( + "Because auto_batching_timeout <= 0 or batch_size == 1," + " set auto_batching_timeout to None.")) self._auto_batching_timeout = None else: self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 @@ -120,7 +121,8 @@ class Op(object): def init_client(self, client_type, client_config, server_endpoints, fetch_names): if self.with_serving == False: - _LOGGER.info("Op({}) no client".format(self.name)) + _LOGGER.info("Op({}) has no client (and it also do not " + "run the process function".format(self.name)) return None if client_type == 'brpc': client = Client() @@ -128,7 +130,8 @@ class Op(object): elif client_type == 'grpc': client = MultiLangClient() else: - raise ValueError("unknow client type: {}".format(client_type)) + raise ValueError("Failed to init client: unknow client " + "type {}".format(client_type)) client.connect(server_endpoints) self._fetch_names = fetch_names return client @@ -143,16 +146,17 @@ class Op(object): for op in ops: if not isinstance(op, Op): _LOGGER.critical( - self._log("input op must be Op type, not {}" - .format(type(op)))) + self._log("Failed to set input_ops: input op " + "must be Op type, not {}".format(type(op)))) os._exit(-1) self._input_ops.append(op) def add_input_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): _LOGGER.critical( - self._log("input channel must be Channel type, not {}" - .format(type(channel)))) + self._log("Failed to set input_channel: input " + "channel must be Channel type, not {}".format( + type(channel)))) os._exit(-1) channel.add_consumer(self.name) self._input = channel @@ -166,8 +170,8 @@ class Op(object): def add_output_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): _LOGGER.critical( - self._log("output channel must be Channel type, not {}" - .format(type(channel)))) + self._log("Failed to add output_channel: output channel " + "must be Channel type, not {}".format(type(channel)))) os._exit(-1) channel.add_producer(self.name) self._outputs.append(channel) @@ -183,8 +187,8 @@ class Op(object): if len(input_dicts) != 1: _LOGGER.critical( self._log( - "this Op has multiple previous inputs. Please override this func." - )) + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.")) os._exit(-1) (_, input_dict), = input_dicts.items() @@ -194,8 +198,8 @@ class Op(object): err, err_info = ChannelData.check_batch_npdata(feed_batch) if err != 0: _LOGGER.critical( - self._log("{}, Please override preprocess func.".format( - err_info))) + self._log("Failed to run process: {}. Please override " + "preprocess func.".format(err_info))) os._exit(-1) call_result = self.client.predict( feed=feed_batch, fetch=self._fetch_names) @@ -274,8 +278,8 @@ class Op(object): def init_op(self): pass - def _run_preprocess(self, parsed_data_dict, log_func): - _LOGGER.debug(log_func("try to run preprocess")) + def _run_preprocess(self, parsed_data_dict, op_info_prefix): + _LOGGER.debug("{} Running preprocess".format(op_info_prefix)) preped_data_dict = {} err_channeldata_dict = {} for data_id, parsed_data in parsed_data_dict.items(): @@ -284,17 +288,17 @@ class Op(object): preped_data = self.preprocess(parsed_data) except TypeError as e: # Error type in channeldata.datatype - error_info = log_func("preprocess data[{}] failed: {}" - .format(data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to preprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) error_channeldata = ChannelData( ecode=ChannelDataEcode.TYPE_ERROR.value, error_info=error_info, data_id=data_id) except Exception as e: - error_info = log_func("preprocess data[{}] failed: {}" - .format(data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to preprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) error_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, error_info=error_info, @@ -303,11 +307,11 @@ class Op(object): err_channeldata_dict[data_id] = error_channeldata else: preped_data_dict[data_id] = preped_data - _LOGGER.debug(log_func("succ run preprocess")) + _LOGGER.debug("{} Succ preprocess".format(op_info_prefix)) return preped_data_dict, err_channeldata_dict - def _run_process(self, preped_data_dict, log_func): - _LOGGER.debug(log_func("try to run process")) + def _run_process(self, preped_data_dict, op_info_prefix): + _LOGGER.debug("{} Running process".format(op_info_prefix)) midped_data_dict = {} err_channeldata_dict = {} if self.with_serving: @@ -320,8 +324,9 @@ class Op(object): midped_batch = self.process(feed_batch) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = log_func("process batch failed: {}".format(e)) - _LOGGER.error(error_info) + error_info = "{} Failed to process(batch: {}): {}".format( + op_info_prefix, data_ids, e) + _LOGGER.error(error_info, exc_info=True) else: for i in range(self._retry): try: @@ -330,30 +335,34 @@ class Op(object): except func_timeout.FunctionTimedOut as e: if i + 1 >= self._retry: ecode = ChannelDataEcode.TIMEOUT.value - error_info = log_func(e) + error_info = "{} Failed to process(batch: {}): " \ + "exceeded retry count.".format( + op_info_prefix, data_ids) _LOGGER.error(error_info) else: _LOGGER.warning( - log_func("PaddleService timeout, retry({}/{})" - .format(i + 1, self._retry))) + "{} Failed to process(batch: {}): timeout, and retrying({}/{})" + .format(op_info_prefix, data_ids, i + 1, + self._retry)) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = log_func("process batch failed: {}".format( - e)) - _LOGGER.error(error_info) + error_info = "{} Failed to process(batch: {}): {}".format( + op_info_prefix, data_ids, e) + _LOGGER.error(error_info, exc_info=True) break else: break if ecode != ChannelDataEcode.OK.value: for data_id in data_ids: + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ecode, error_info=error_info, data_id=data_id) elif midped_batch is None: # op client return None - error_info = log_func( - "predict failed. pls check the server side.") - _LOGGER.error(error_info) + error_info = "{} Failed to predict, please check if PaddleServingService" \ + " is working properly.".format(op_info_prefix) for data_id in data_ids: + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ChannelDataEcode.CLIENT_ERROR.value, error_info=error_info, @@ -367,11 +376,12 @@ class Op(object): } else: midped_data_dict = preped_data_dict - _LOGGER.debug(log_func("succ run process")) + _LOGGER.debug("{} Succ process".format(op_info_prefix)) return midped_data_dict, err_channeldata_dict - def _run_postprocess(self, parsed_data_dict, midped_data_dict, log_func): - _LOGGER.debug(log_func("try to run postprocess")) + def _run_postprocess(self, parsed_data_dict, midped_data_dict, + op_info_prefix): + _LOGGER.debug("{} Running postprocess".format(op_info_prefix)) postped_data_dict = {} err_channeldata_dict = {} for data_id, midped_data in midped_data_dict.items(): @@ -380,9 +390,9 @@ class Op(object): postped_data = self.postprocess(parsed_data_dict[data_id], midped_data) except Exception as e: - error_info = log_func("postprocess data[{}] failed: {}" - .format(data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to postprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) err_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, error_info=error_info, @@ -392,9 +402,11 @@ class Op(object): continue else: if not isinstance(postped_data, dict): - error_info = log_func( - "output of postprocess funticon must be " - "dict type, but get {}".format(type(postped_data))) + error_info = "(logid={}) {} Failed to postprocess: " \ + "output of postprocess funticon must be " \ + "dict type, but get {}".format( + data_id, op_info_prefix, + type(postped_data)) _LOGGER.error(error_info) err_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, @@ -416,16 +428,13 @@ class Op(object): dictdata=postped_data, data_id=data_id) postped_data_dict[data_id] = output_data - _LOGGER.debug(log_func("succ run postprocess")) + _LOGGER.debug("{} Succ postprocess".format(op_info_prefix)) return postped_data_dict, err_channeldata_dict def _auto_batching_generator(self, input_channel, op_name, batch_size, - timeout, log_func): + timeout, op_info_prefix): while True: batch = [] - _LOGGER.debug( - log_func("Auto-batching expect size: {}; timeout(s): {}".format( - batch_size, timeout))) while len(batch) == 0: endtime = None if timeout is not None: @@ -436,7 +445,8 @@ class Op(object): if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: - _LOGGER.debug(log_func("Auto-batching timeout")) + _LOGGER.debug("{} Failed to generate batch: " + "timeout".format(op_info_prefix)) break channeldata_dict = input_channel.front(op_name, timeout) @@ -444,10 +454,11 @@ class Op(object): channeldata_dict = input_channel.front(op_name) batch.append(channeldata_dict) except ChannelTimeoutError: - _LOGGER.debug(log_func("Auto-batching timeout")) + _LOGGER.debug("{} Failed to generate batch: " + "timeout".format(op_info_prefix)) break - _LOGGER.debug( - log_func("Auto-batching actual size: {}".format(len(batch)))) + _LOGGER.debug("{} Got actual batch_size: {}".format(op_info_prefix, + len(batch))) yield batch def _parse_channeldata_batch(self, batch, output_channels): @@ -472,14 +483,7 @@ class Op(object): def _run(self, concurrency_idx, input_channel, output_channels, client_type, is_thread_op): - def get_log_func(op_info_prefix): - def log_func(info_str): - return "{} {}".format(op_info_prefix, info_str) - - return log_func - op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) - log = get_log_func(op_info_prefix) tid = threading.current_thread().ident # init op @@ -488,22 +492,27 @@ class Op(object): profiler = self._initialize(is_thread_op, client_type, concurrency_idx) except Exception as e: - _LOGGER.critical(log("init op failed: {}".format(e))) + _LOGGER.critical( + "{} Failed to init op: {}".format(op_info_prefix, e), + exc_info=True) os._exit(-1) - _LOGGER.info(log("succ init")) + _LOGGER.info("{} Succ init".format(op_info_prefix)) batch_generator = self._auto_batching_generator( input_channel=input_channel, op_name=self.name, batch_size=self._batch_size, timeout=self._auto_batching_timeout, - log_func=log) + op_info_prefix=op_info_prefix) + start_prep, end_prep = None, None + start_midp, end_midp = None, None + start_postp, end_postp = None, None while True: try: channeldata_dict_batch = next(batch_generator) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break @@ -513,7 +522,7 @@ class Op(object): = self._parse_channeldata_batch( channeldata_dict_batch, output_channels) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(parsed_data_dict) == 0: @@ -521,10 +530,14 @@ class Op(object): continue # preprecess - profiler.record("prep#{}_0".format(op_info_prefix)) + start_prep = profiler.record("prep#{}_0".format(op_info_prefix)) preped_data_dict, err_channeldata_dict \ - = self._run_preprocess(parsed_data_dict, log) - profiler.record("prep#{}_1".format(op_info_prefix)) + = self._run_preprocess(parsed_data_dict, op_info_prefix) + end_prep = profiler.record("prep#{}_1".format(op_info_prefix)) + _LOGGER.log(level=1, + msg="(logid={}) {} prep[{} ms]".format( + parsed_data_dict.keys(), op_info_prefix, + (end_prep - start_prep) / 1e3)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -533,17 +546,21 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(parsed_data_dict) == 0: continue # process - profiler.record("midp#{}_0".format(op_info_prefix)) + start_midp = profiler.record("midp#{}_0".format(op_info_prefix)) midped_data_dict, err_channeldata_dict \ - = self._run_process(preped_data_dict, log) - profiler.record("midp#{}_1".format(op_info_prefix)) + = self._run_process(preped_data_dict, op_info_prefix) + end_midp = profiler.record("midp#{}_1".format(op_info_prefix)) + _LOGGER.log(level=1, + msg="(logid={}) {} midp[{} ms]".format( + preped_data_dict.keys(), op_info_prefix, + (end_midp - start_midp) / 1e3)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -552,18 +569,22 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(midped_data_dict) == 0: continue # postprocess - profiler.record("postp#{}_0".format(op_info_prefix)) + start_postp = profiler.record("postp#{}_0".format(op_info_prefix)) postped_data_dict, err_channeldata_dict \ = self._run_postprocess( - parsed_data_dict, midped_data_dict, log) - profiler.record("postp#{}_1".format(op_info_prefix)) + parsed_data_dict, midped_data_dict, op_info_prefix) + end_postp = profiler.record("postp#{}_1".format(op_info_prefix)) + _LOGGER.log(level=1, + msg="(logid={}) {} postp[{} ms]".format( + midped_data_dict.keys(), op_info_prefix, + (end_midp - start_midp) / 1e3)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -572,7 +593,7 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(postped_data_dict) == 0: @@ -591,7 +612,7 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break @@ -646,7 +667,7 @@ class RequestOp(Op): try: self.init_op() except Exception as e: - _LOGGER.critical("Op(Request) init op failed: {}".format(e)) + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) os._exit(-1) def unpack_request_package(self, request): @@ -670,7 +691,8 @@ class ResponseOp(Op): try: self.init_op() except Exception as e: - _LOGGER.critical("Op(ResponseOp) init op failed: {}".format(e)) + _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format( + e, exc_info=True)) os._exit(-1) def pack_response_package(self, channeldata): @@ -693,14 +715,19 @@ class ResponseOp(Op): resp.error_info = self._log( "fetch var type must be str({}).".format( type(var))) + _LOGGER.error("(logid={}) Failed to pack RPC " + "response package: {}".format( + channeldata.id, resp.error_info)) break resp.value.append(var) resp.key.append(name) else: resp.ecode = ChannelDataEcode.TYPE_ERROR.value resp.error_info = self._log( - "Error type({}) in datatype.".format(channeldata.datatype)) - _LOGGER.error(resp.error_info) + "error type({}) in datatype.".format(channeldata.datatype)) + _LOGGER.error("(logid={}) Failed to pack RPC response" + " package: {}".format(channeldata.id, + resp.error_info)) else: resp.error_info = channeldata.error_info return resp @@ -718,6 +745,7 @@ class VirtualOp(Op): self._virtual_pred_ops.append(op) def _actual_pred_op_names(self, op): + # can use disjoint-set, but it's not necessary if not isinstance(op, VirtualOp): return [op.name] names = [] @@ -728,8 +756,9 @@ class VirtualOp(Op): def add_output_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): _LOGGER.critical( - self._log("output channel must be Channel type, not {}" - .format(type(channel)))) + self._log("Failed to add output_channel: output_channel" + " must be Channel type, not {}".format( + type(channel)))) os._exit(-1) for op in self._virtual_pred_ops: for op_name in self._actual_pred_op_names(op): @@ -738,12 +767,6 @@ class VirtualOp(Op): def _run(self, concurrency_idx, input_channel, output_channels, client_type, is_thread_op): - def get_log_func(op_info_prefix): - def log_func(info_str): - return "{} {}".format(op_info_prefix, info_str) - - return log_func - op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = get_log_func(op_info_prefix) tid = threading.current_thread().ident @@ -759,7 +782,7 @@ class VirtualOp(Op): try: channeldata_dict_batch = next(batch_generator) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break @@ -769,6 +792,6 @@ class VirtualOp(Op): self._push_to_output_channels( data, channels=output_channels, name=name) except ChannelStopError: - _LOGGER.debug(log("Channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index d296af85..0d64a558 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -67,9 +67,11 @@ class PipelineServer(object): def set_response_op(self, response_op): if not isinstance(response_op, ResponseOp): - raise Exception("response_op must be ResponseOp type.") + raise Exception("Failed to set response_op: response_op " + "must be ResponseOp type.") if len(response_op.get_input_ops()) != 1: - raise Exception("response_op can only have one previous op.") + raise Exception("Failed to set response_op: response_op " + "can only have one previous op.") self._response_op = response_op def _port_is_available(self, port): @@ -83,7 +85,8 @@ class PipelineServer(object): self._port = conf["port"] if not self._port_is_available(self._port): - raise SystemExit("Prot {} is already used".format(self._port)) + raise SystemExit("Failed to prepare_server: prot {} " + "is already used".format(self._port)) self._worker_num = conf["worker_num"] self._build_dag_each_worker = conf["build_dag_each_worker"] diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 3adf3883..60e4883a 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object): def record(self, name): if self._enable is False: return - self.time_record.append('{}:{} '.format(name, - int(round(_time() * 1000000)))) + timestamp = int(round(_time() * 1000000)) + self.time_record.append('{}:{} '.format(name, timestamp)) + return timestamp def print_profile(self): if self._enable is False: @@ -80,6 +81,7 @@ class TimeProfiler(object): name = '_'.join(name_with_tag[:-1]) with self._lock: self._time_record.put((name, tag, timestamp)) + return timestamp def print_profile(self): if self._enable is False: -- GitLab