diff --git a/doc/COMPILE.md b/doc/COMPILE.md index 46ebfb4f1a882b6645cb1e9bb6155743e520951d..84b1b65cbdbb0dcf6079d30bd7ebc9baf4a8c6e1 100644 --- a/doc/COMPILE.md +++ b/doc/COMPILE.md @@ -4,12 +4,26 @@ ## Compilation environment requirements -- OS: CentOS 7 -- GCC: 4.8.2 and later -- Golang: 1.9.2 and later -- Git:2.17.1 and later -- CMake:3.2.2 and later -- Python:2.7.2 and later / 3.6 and later +| module | version | +| :--------------------------: | :----------------------------------------------------------: | +| OS | CentOS 7 | +| gcc | 4.8.5 and later | +| gcc-c++ | 4.8.5 and later | +| git | 3.82 and later | +| cmake | 3.2.0 and later | +| Python | 2.7.2 and later / 3.6 and later | +| Go | 1.9.2 and later | +| git | 2.17.1 and later | +| glibc-static | 2.17 | +| openssl-devel | 1.0.2k | +| bzip2-devel | 1.0.6 and later | +| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later | +| sqlite-devel | 3.7.17 and later | +| patchelf | 0.9 and later | +| libXext | 1.3.3 | +| libSM | 1.2.2 | +| libXrender | 0.9.10 | +| python-whl | numpy>=1.12, <=1.16.4
google>=2.0.3
protobuf>=3.12.2
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout>=4.3.5
pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3 | It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md). diff --git a/doc/COMPILE_CN.md b/doc/COMPILE_CN.md index 54f80d54d334835600d08846dc0fb42efe6558ee..a38faff4289a4946d82f8b4a71afd521c7cd48fd 100644 --- a/doc/COMPILE_CN.md +++ b/doc/COMPILE_CN.md @@ -4,12 +4,26 @@ ## 编译环境设置 -- OS: CentOS 7 -- GCC: 4.8.2及以上 -- Golang: 1.9.2及以上 -- Git:2.17.1及以上 -- CMake:3.2.2及以上 -- Python:2.7.2及以上 / 3.6及以上 +| 组件 | 版本要求 | +| :--------------------------: | :----------------------------------------------------------: | +| OS | CentOS 7 | +| gcc | 4.8.5 and later | +| gcc-c++ | 4.8.5 and later | +| git | 3.82 and later | +| cmake | 3.2.0 and later | +| Python | 2.7.2 and later / 3.6 and later | +| Go | 1.9.2 and later | +| git | 2.17.1 and later | +| glibc-static | 2.17 | +| openssl-devel | 1.0.2k | +| bzip2-devel | 1.0.6 and later | +| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later | +| sqlite-devel | 3.7.17 and later | +| patchelf | 0.9 | +| libXext | 1.3.3 | +| libSM | 1.2.2 | +| libXrender | 0.9.10 | +| python-whl | numpy>=1.12, <=1.16.4
google>=2.0.3
protobuf>=3.12.2
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout>=4.3.5
pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3 | 推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md)。 diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index 8f3a30c6cee0f5d2d4d8d62c3b6dc3a3b3ba7212..9ecb32ada02fee0a0bef5bff831355099cbb6060 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -95,7 +95,7 @@ The meaning of each parameter is as follows: | fetch_list | (list) List of fetch variable names for remote Paddle Serving Service. | | client_config | (str) The path of the client configuration file corresponding to the Paddle Serving Service. | | concurrency | (int) The number of concurrent OPs. | -| timeout | (int) The timeout time of the process operation, in seconds. If the value is less than zero, no timeout is considered. | +| timeout | (int) The timeout time of the process operation, in ms. If the value is less than zero, no timeout is considered. | | retry | (int) Timeout number of retries. When the value is 1, no retries are made. | | batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. | | auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). | diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index 79c9cf9eb2fabef79ab0e07f85e275db96385a19..4632783367c95d0fbd7eb478eb137e21d482b900 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -95,7 +95,7 @@ def __init__(name=None, | fetch_list | (list)远程 Paddle Serving Service 的 fetch 列表。 | | client_config | (str)Paddle Serving Service 对应的 Client 端配置文件路径。 | | concurrency | (int)OP 的并发数。 | -| timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 | +| timeout | (int)process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 | | retry | (int)超时重试次数。当该值为 1 时,不进行重试。 | | batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 | | auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。 | diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index 13a85d068e2a1bbea5caa2a552ce4f26cb3b0316..3f0b1bb8d4eedb073fa5014eb20e1a170f0d811b 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -6,3 +6,5 @@ dag: client_type: brpc retry: 1 use_profile: false + tracer: + interval_s: 10 diff --git a/python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py b/python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py index a9e5b16ccd1d9861b41d60c1bf1b641c688f59d5..515fd352483e1a3ced57014e11d4faff2a6faddb 100644 --- a/python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py +++ b/python/examples/pipeline/imdb_model_ensemble/test_pipeline_server.py @@ -12,20 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=doc-string-missing - -import logging -logging.basicConfig( - format="[%(process)d](%(threadName)s) %(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s", - level=logging.INFO) - +import paddle_serving_server.pipeline as pipeline from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.channel import ChannelDataEcode import numpy as np from paddle_serving_app.reader import IMDBDataset +import logging _LOGGER = logging.getLogger() +console_handler = pipeline.logger.StreamHandler() +console_handler.setLevel(logging.INFO) +console_handler.setFormatter( + logging.Formatter( + "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")) +_LOGGER.addHandler(console_handler) class ImdbRequestOp(RequestOp): diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index f720e4d2c851cec6270d31d6d44a766acc246291..913ee39f03d480663a79b1b2b1503835100ee176 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -11,7 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logger # this module must be the first to import +from operator import Op, RequestOp, ResponseOp +from pipeline_server import PipelineServer +from pipeline_client import PipelineClient +from analyse import Analyst from operator import Op, RequestOp, ResponseOp from pipeline_server import PipelineServer from pipeline_client import PipelineClient diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 65194889ae65bcd7a32f934613fe9514f3a1f6c3..11a68b272e88fc6575d48b43ff0ea400702e45db 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -17,7 +17,7 @@ import copy import re import logging -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.analyse") class Analyst(object): @@ -164,7 +164,7 @@ class OpAnalyst(object): def add(self, name_str, ts_list): if self._close: - _LOGGER.error("OpAnalyst is closed.") + _LOGGER.error("Failed to add item: OpAnalyst is closed.") return op_name, curr_idx, step = self._parse(name_str) if op_name not in self.op_time_list_dict: diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 8a03c30b9075aa56f88d8f876932cf6a68835c62..08c8936e50465b2e2bb9ec5e025de117af71e061 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -26,9 +26,10 @@ else: import numpy as np import logging import enum +import os import copy -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.channel") class ChannelDataEcode(enum.Enum): @@ -69,21 +70,25 @@ class ChannelData(object): ''' if ecode is not None: if data_id is None or error_info is None: - raise ValueError("data_id and error_info cannot be None") + _LOGGER.critical("Failed to generate ChannelData: data_id" + " and error_info cannot be None") + os._exit(-1) datatype = ChannelDataType.ERROR.value else: if datatype == ChannelDataType.CHANNEL_NPDATA.value: ecode, error_info = ChannelData.check_npdata(npdata) if ecode != ChannelDataEcode.OK.value: datatype = ChannelDataType.ERROR.value - _LOGGER.error(error_info) + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) elif datatype == ChannelDataType.DICT.value: ecode, error_info = ChannelData.check_dictdata(dictdata) if ecode != ChannelDataEcode.OK.value: datatype = ChannelDataType.ERROR.value - _LOGGER.error(error_info) + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) else: - raise ValueError("datatype not match") + _LOGGER.critical("(logid={}) datatype not match".format( + data_id)) + os._exit(-1) self.datatype = datatype self.npdata = npdata self.dictdata = dictdata @@ -107,14 +112,14 @@ class ChannelData(object): for sample in dictdata: if not isinstance(sample, dict): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(sample)) + error_info = "Failed to check data: the type of " \ + "data must be dict, but get {}.".format(type(sample)) break elif not isinstance(dictdata, dict): # batch size = 1 ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(dictdata)) + error_info = "Failed to check data: the type of data must " \ + "be dict, but get {}.".format(type(dictdata)) return ecode, error_info @staticmethod @@ -136,27 +141,30 @@ class ChannelData(object): for sample in npdata: if not isinstance(sample, dict): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(sample)) + error_info = "Failed to check data: the " \ + "value of data must be dict, but get {}.".format( + type(sample)) break for _, value in sample.items(): if not isinstance(value, np.ndarray): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be np.ndarray, but get {}.".format(type(value)) + error_info = "Failed to check data: the" \ + " value of data must be np.ndarray, but get {}.".format( + type(value)) return ecode, error_info elif isinstance(npdata, dict): # batch_size = 1 for _, value in npdata.items(): if not isinstance(value, np.ndarray): ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be np.ndarray, but get {}.".format(type(value)) + error_info = "Failed to check data: the value " \ + "of data must be np.ndarray, but get {}.".format( + type(value)) break else: ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be dict, but get {}.".format(type(npdata)) + error_info = "Failed to check data: the value of data " \ + "must be dict, but get {}.".format(type(npdata)) return ecode, error_info def parse(self): @@ -168,7 +176,9 @@ class ChannelData(object): # return dict feed = self.dictdata else: - raise TypeError("Error type({}) in datatype.".format(self.datatype)) + _LOGGER.critical("Failed to parse channeldata: error " \ + "type({}) in datatype.".format(self.datatype)) + os._exit(-1) return feed def __str__(self): @@ -229,6 +239,12 @@ class ProcessChannel(object): self._base_cursor = manager.Value('i', 0) self._output_buf = manager.list() + def get_maxsize(self): + return self._maxsize + + def size(self): + return self._que.qsize() + def get_producers(self): return self._producers @@ -241,30 +257,38 @@ class ProcessChannel(object): def add_producer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._producers: - raise ValueError( - self._log("producer({}) is already in channel".format(op_name))) + _LOGGER.critical( + self._log("Failed to add producer: producer({})" \ + " is already in channel".format(op_name))) + os._exit(-1) self._producers.append(op_name) + _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name))) def add_consumer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._consumer_cursors: - raise ValueError( - self._log("consumer({}) is already in channel".format(op_name))) + _LOGGER.critical( + self._log("Failed to add consumer: consumer({})" \ + " is already in channel".format(op_name))) + os._exit(-1) self._consumer_cursors[op_name] = 0 if self._cursor_count.get(0) is None: self._cursor_count[0] = 0 self._cursor_count[0] += 1 + _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name))) def push(self, channeldata, op_name=None): _LOGGER.debug( - self._log("{} try to push data[{}]".format(op_name, - channeldata.id))) + self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, + op_name))) if len(self._producers) == 0: - raise Exception( + _LOGGER.critical( self._log( - "expected number of producers to be greater than 0, but the it is 0." - )) + "(logid={}) Op({}) Failed to push data: expected number" + " of producers to be greater than 0, but the it is 0.". + format(channeldata.id, op_name))) + os._exit(-1) elif len(self._producers) == 1: with self._cv: while self._stop.value == 0: @@ -277,13 +301,16 @@ class ProcessChannel(object): raise ChannelStopError() self._cv.notify_all() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.".format( - op_name, channeldata.id))) + self._log("(logid={}) Op({}) Pushed data into internal queue.". + format(channeldata.id, op_name))) return True elif op_name is None: - raise Exception( + _LOGGER.critical( self._log( - "There are multiple producers, so op_name cannot be None.")) + "(logid={}) Op({}) Failed to push data: there are multiple " + "producers, so op_name cannot be None.".format( + channeldata.id, op_name))) + os._exit(-1) producer_num = len(self._producers) data_id = channeldata.id @@ -310,8 +337,9 @@ class ProcessChannel(object): if put_data is None: _LOGGER.debug( - self._log("{} succ push data[{}] into input_buffer.".format( - op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into input_buffer.". + format(data_id, op_name))) else: while self._stop.value == 0: try: @@ -323,15 +351,16 @@ class ProcessChannel(object): raise ChannelStopError() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.". - format(op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into internal_queue.". + format(data_id, op_name))) self._cv.notify_all() return True def front(self, op_name=None, timeout=None): _LOGGER.debug( - self._log("{} try to get data[?]; timeout={}".format(op_name, - timeout))) + self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name, + timeout))) endtime = None if timeout is not None: if timeout <= 0: @@ -340,10 +369,11 @@ class ProcessChannel(object): endtime = _time() + timeout if len(self._consumer_cursors) == 0: - raise Exception( + _LOGGER.critical( self._log( - "expected number of consumers to be greater than 0, but the it is 0." - )) + "Op({}) Failed to get data: expected number of consumers to be " \ + "greater than 0, but the it is 0.".format(op_name))) + os._exit(-1) elif len(self._consumer_cursors) == 1: resp = None with self._cv: @@ -356,8 +386,8 @@ class ProcessChannel(object): remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: " + "timeout".format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -365,13 +395,15 @@ class ProcessChannel(object): if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( - self._log("{} succ get data[{}]".format(op_name, - resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data".format(resp.values()[0] + .id, op_name))) return resp elif op_name is None: - raise Exception( + _LOGGER.critical( self._log( - "There are multiple consumers, so op_name cannot be None.")) + "Op({}) Failed to get data: there are multiple consumers, " + "so op_name cannot be None.".format(op_name))) + os._exit(-1) # In output_buf, different Ops (according to op_name) have different # cursors. In addition, there is a base_cursor. Their difference is @@ -392,16 +424,17 @@ class ProcessChannel(object): channeldata = self._que.get(timeout=0) self._output_buf.append(channeldata) _LOGGER.debug( - self._log("pop ready item[{}] into output_buffer". - format(channeldata.values()[0].id))) + self._log( + "(logid={}) Op({}) Pop ready item into output_buffer". + format(channeldata.values()[0].id, op_name))) break except Queue.Empty: if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -424,7 +457,7 @@ class ProcessChannel(object): self._base_cursor.value += 1 # to avoid cursor overflow if self._base_cursor.value >= self._reset_max_cursor: - _LOGGER.info(self._log("reset cursor in Channel")) + _LOGGER.info(self._log("Reset cursor in Channel")) self._base_cursor.value -= self._reset_max_cursor for name in self._consumer_cursors.keys(): self._consumer_cursors[name] -= self._reset_max_cursor @@ -445,12 +478,12 @@ class ProcessChannel(object): self._cv.notify_all() _LOGGER.debug( - self._log("{} succ get data[{}] from output_buffer".format( - op_name, resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data from output_buffer".format( + resp.values()[0].id, op_name))) return resp def stop(self): - _LOGGER.debug(self._log("stop.")) + _LOGGER.info(self._log("stop.")) self._stop.value = 1 with self._cv: self._cv.notify_all() @@ -503,6 +536,12 @@ class ThreadChannel(Queue.Queue): self._base_cursor = 0 self._output_buf = [] + def get_maxsize(self): + return self._maxsize + + def size(self): + return self.qsize() + def get_producers(self): return self._producers @@ -512,37 +551,41 @@ class ThreadChannel(Queue.Queue): def _log(self, info_str): return "[{}] {}".format(self.name, info_str) - def debug(self): - return self._log("p: {}, c: {}".format(self.get_producers(), - self.get_consumers())) - def add_producer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._producers: - raise ValueError( - self._log("producer({}) is already in channel".format(op_name))) + _LOGGER.critical( + self._log("Failed to add producer: producer({}) is " + "already in channel".format(op_name))) + os._exit(-1) self._producers.append(op_name) + _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name))) def add_consumer(self, op_name): """ not thread safe, and can only be called during initialization. """ if op_name in self._consumer_cursors: - raise ValueError( - self._log("consumer({}) is already in channel".format(op_name))) + _LOGGER.critical( + self._log("Failed to add consumer: consumer({}) is " + "already in channel".format(op_name))) + os._exit(-1) self._consumer_cursors[op_name] = 0 if self._cursor_count.get(0) is None: self._cursor_count[0] = 0 self._cursor_count[0] += 1 + _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name))) def push(self, channeldata, op_name=None): _LOGGER.debug( - self._log("{} try to push data[{}]".format(op_name, - channeldata.id))) + self._log("(logid={}) Op({}) Pushing data".format(channeldata.id, + op_name))) if len(self._producers) == 0: - raise Exception( + _LOGGER.critical( self._log( - "expected number of producers to be greater than 0, but the it is 0." - )) + "(logid={}) Op({}) Failed to push data: expected number of " + "producers to be greater than 0, but the it is 0.".format( + channeldata.id, op_name))) + os._exit(-1) elif len(self._producers) == 1: with self._cv: while self._stop is False: @@ -555,13 +598,16 @@ class ThreadChannel(Queue.Queue): raise ChannelStopError() self._cv.notify_all() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.".format( - op_name, channeldata.id))) + self._log("(logid={}) Op({}) Pushed data into internal_queue.". + format(channeldata.id, op_name))) return True elif op_name is None: - raise Exception( + _LOGGER.critical( self._log( - "There are multiple producers, so op_name cannot be None.")) + "(logid={}) Op({}) Failed to push data: there are multiple" + " producers, so op_name cannot be None.".format( + channeldata.id, op_name))) + os._exit(-1) producer_num = len(self._producers) data_id = channeldata.id @@ -583,8 +629,9 @@ class ThreadChannel(Queue.Queue): if put_data is None: _LOGGER.debug( - self._log("{} succ push data[{}] into input_buffer.".format( - op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into input_buffer.". + format(data_id, op_name))) else: while self._stop is False: try: @@ -596,15 +643,16 @@ class ThreadChannel(Queue.Queue): raise ChannelStopError() _LOGGER.debug( - self._log("{} succ push data[{}] into internal queue.". - format(op_name, data_id))) + self._log( + "(logid={}) Op({}) Pushed data into internal_queue.". + format(data_id, op_name))) self._cv.notify_all() return True def front(self, op_name=None, timeout=None): _LOGGER.debug( - self._log("{} try to get data[?]; timeout={}".format(op_name, - timeout))) + self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name, + timeout))) endtime = None if timeout is not None: if timeout <= 0: @@ -613,10 +661,11 @@ class ThreadChannel(Queue.Queue): endtime = _time() + timeout if len(self._consumer_cursors) == 0: - raise Exception( + _LOGGER.critical( self._log( - "expected number of consumers to be greater than 0, but the it is 0." - )) + "Op({}) Failed to get data: expected number of consumers to be " + "greater than 0, but the it is 0.".format(op_name))) + os._exit(-1) elif len(self._consumer_cursors) == 1: resp = None with self._cv: @@ -629,8 +678,9 @@ class ThreadChannel(Queue.Queue): remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log( + "Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -638,13 +688,15 @@ class ThreadChannel(Queue.Queue): if self._stop: raise ChannelStopError() _LOGGER.debug( - self._log("{} succ get data[{}]".format(op_name, - resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data".format(resp.values()[0] + .id, op_name))) return resp elif op_name is None: - raise Exception( - self._log( - "There are multiple consumers, so op_name cannot be None.")) + _LOGGER.critical( + self._log("Op({}) Failed to get data: there are multiple " + "consumers, so op_name cannot be None.".format( + op_name))) + os._exit(-1) # In output_buf, different Ops (according to op_name) have different # cursors. In addition, there is a base_cursor. Their difference is @@ -665,16 +717,17 @@ class ThreadChannel(Queue.Queue): channeldata = self.get(timeout=0) self._output_buf.append(channeldata) _LOGGER.debug( - self._log("pop ready item[{}] into output_buffer". - format(channeldata.values()[0].id))) + self._log( + "(logid={}) Op({}) Pop ready item into output_buffer". + format(channeldata.values()[0].id, op_name))) break except Queue.Empty: if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: _LOGGER.debug( - self._log("{} get data[?] timeout".format( - op_name))) + self._log("Op({}) Failed to get data: timeout". + format(op_name))) raise ChannelTimeoutError() self._cv.wait(remaining) else: @@ -698,7 +751,7 @@ class ThreadChannel(Queue.Queue): self._base_cursor += 1 # to avoid cursor overflow if self._base_cursor >= self._reset_max_cursor: - _LOGGER.info(self._log("reset cursor in Channel")) + _LOGGER.info(self._log("Reset cursor in Channel")) self._base_cursor -= self._reset_max_cursor for name in self._consumer_cursors: self._consumer_cursors[name] -= self._reset_max_cursor @@ -718,12 +771,12 @@ class ThreadChannel(Queue.Queue): self._cv.notify_all() _LOGGER.debug( - self._log("{} succ get data[{}] from output_buffer".format( - op_name, resp.values()[0].id))) + self._log("(logid={}) Op({}) Got data from output_buffer".format( + resp.values()[0].id, op_name))) return resp def stop(self): - _LOGGER.debug(self._log("stop.")) + _LOGGER.info(self._log("stop.")) self._stop = True with self._cv: self._cv.notify_all() diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 8f36105d77680ec725232599a956d7a3b555ac41..3a9e8c6fc534098287589d30c3fcb9d19f697453 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -28,48 +28,40 @@ import logging from .operator import Op, RequestOp, ResponseOp, VirtualOp from .channel import (ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType, ChannelStopError) -from .profiler import TimeProfiler +from .profiler import TimeProfiler, PerformanceTracer from .util import NameGenerator +from .proto import pipeline_service_pb2 -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.dag") class DAGExecutor(object): - def __init__(self, response_op, dag_config, show_info): - default_conf = { - "retry": 1, - "client_type": "brpc", - "use_profile": False, - "channel_size": 0, - "is_thread_op": True - } + def __init__(self, response_op, server_conf): + build_dag_each_worker = server_conf["build_dag_each_worker"] + server_worker_num = server_conf["worker_num"] + dag_conf = server_conf["dag"] - for key, val in default_conf.items(): - if dag_config.get(key) is None: - _LOGGER.warning("[CONF] {} not set, use default: {}" - .format(key, val)) - dag_config[key] = val - - self._retry = dag_config["retry"] - client_type = dag_config["client_type"] - self._server_use_profile = dag_config["use_profile"] - channel_size = dag_config["channel_size"] - self._is_thread_op = dag_config["is_thread_op"] - build_dag_each_worker = dag_config["build_dag_each_worker"] - - if show_info: - _LOGGER.info("=============== DAGExecutor ===============") - for key in default_conf.keys(): - _LOGGER.info("{}: {}".format(key, dag_config[key])) - _LOGGER.info("-------------------------------------------") + self._retry = dag_conf["retry"] + client_type = dag_conf["client_type"] + self._server_use_profile = dag_conf["use_profile"] + channel_size = dag_conf["channel_size"] + self._is_thread_op = dag_conf["is_thread_op"] - self.name = "@G" + tracer_conf = dag_conf["tracer"] + tracer_interval_s = tracer_conf["interval_s"] + + self.name = "@DAGExecutor" self._profiler = TimeProfiler() self._profiler.enable(True) + self._tracer = None + if tracer_interval_s >= 1: + self._tracer = PerformanceTracer( + self._is_thread_op, tracer_interval_s, server_worker_num) + self._dag = DAG(self.name, response_op, self._server_use_profile, self._is_thread_op, client_type, channel_size, - show_info, build_dag_each_worker) + build_dag_each_worker, self._tracer) (in_channel, out_channel, pack_rpc_func, unpack_rpc_func) = self._dag.build() self._dag.start() @@ -79,12 +71,15 @@ class DAGExecutor(object): self._pack_rpc_func = pack_rpc_func self._unpack_rpc_func = unpack_rpc_func + if self._tracer is not None: + self._tracer.start() + self._id_lock = threading.Lock() self._id_counter = 0 self._reset_max_id = 1000000000000000000 self._cv_pool = {} self._cv_for_cv_pool = threading.Condition() - self._fetch_buffer = None + self._fetch_buffer = {} self._recive_func = None self._client_profile_key = "pipeline.profile" @@ -93,32 +88,44 @@ class DAGExecutor(object): def start(self): self._recive_func = threading.Thread( target=DAGExecutor._recive_out_channel_func, args=(self, )) + self._recive_func.daemon = True self._recive_func.start() - _LOGGER.debug("[DAG Executor] start recive thread") + _LOGGER.debug("[DAG Executor] Start recive thread") def stop(self): self._dag.stop() self._dag.join() - _LOGGER.info("[DAG Executor] succ stop") + _LOGGER.info("[DAG Executor] Stop") def _get_next_data_id(self): + data_id = None with self._id_lock: if self._id_counter >= self._reset_max_id: + _LOGGER.info("[DAG Executor] Reset request id") self._id_counter -= self._reset_max_id + data_id = self._id_counter self._id_counter += 1 - return self._id_counter - 1 + cond_v = threading.Condition() + with self._cv_for_cv_pool: + self._cv_pool[data_id] = cond_v + self._fetch_buffer[data_id] = None + return data_id, cond_v def _set_in_channel(self, in_channel): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): - raise TypeError("in_channel must be Channel type, but get {}". - format(type(in_channel))) + _LOGGER.critical("[DAG Executor] Failed to set in_channel: " + "in_channel must be Channel type, but get {}". + format(type(in_channel))) + os._exit(-1) in_channel.add_producer(self.name) self._in_channel = in_channel def _set_out_channel(self, out_channel): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): - raise TypeError("iout_channel must be Channel type, but get {}". - format(type(out_channel))) + _LOGGER.critical("[DAG Executor] Failed to set out_channel: " + "must be Channel type, but get {}".format( + type(out_channel))) + os._exit(-1) out_channel.add_consumer(self.name) self._out_channel = out_channel @@ -128,7 +135,7 @@ class DAGExecutor(object): try: channeldata_dict = self._out_channel.front(self.name) except ChannelStopError: - _LOGGER.debug("[DAG Executor] channel stop.") + _LOGGER.info("[DAG Executor] Stop.") with self._cv_for_cv_pool: for data_id, cv in self._cv_pool.items(): closed_errror_data = ChannelData( @@ -136,49 +143,61 @@ class DAGExecutor(object): error_info="dag closed.", data_id=data_id) with cv: - self._fetch_buffer = closed_errror_data + self._fetch_buffer[data_id] = closed_errror_data cv.notify_all() break if len(channeldata_dict) != 1: - _LOGGER.error( - "[DAG Executor] out_channel cannot have multiple input ops") + _LOGGER.critical( + "[DAG Executor] Failed to fetch result: out_channel " + "cannot have multiple input ops") os._exit(-1) (_, channeldata), = channeldata_dict.items() if not isinstance(channeldata, ChannelData): - _LOGGER.error( - '[DAG Executor] data must be ChannelData type, but get {}' + _LOGGER.critical( + '[DAG Executor] Failed to fetch result: data in out_channel" \ + " must be ChannelData type, but get {}' .format(type(channeldata))) os._exit(-1) data_id = channeldata.id - _LOGGER.debug("recive thread fetch data[{}]".format(data_id)) + _LOGGER.debug("(logid={}) [recive thread] Fetched data".format( + data_id)) with self._cv_for_cv_pool: - cv = self._cv_pool[data_id] - with cv: - self._fetch_buffer = channeldata - cv.notify_all() - - def _get_channeldata_from_fetch_buffer(self, data_id): - resp = None - cv = threading.Condition() - with self._cv_for_cv_pool: - self._cv_pool[data_id] = cv - with cv: - cv.wait() - with self._cv_for_cv_pool: - resp = copy.deepcopy(self._fetch_buffer) - _LOGGER.debug("resp thread get resp data[{}]".format(data_id)) - self._cv_pool.pop(data_id) - return resp + cond_v = self._cv_pool[data_id] + with cond_v: + self._fetch_buffer[data_id] = channeldata + cond_v.notify_all() + + def _get_channeldata_from_fetch_buffer(self, data_id, cond_v): + ready_data = None + + with cond_v: + with self._cv_for_cv_pool: + if self._fetch_buffer[data_id] is not None: + # The requested data is already ready + ready_data = self._fetch_buffer[data_id] + self._cv_pool.pop(data_id) + self._fetch_buffer.pop(data_id) + if ready_data is None: + # Wait for data ready + cond_v.wait() + with self._cv_for_cv_pool: + ready_data = self._fetch_buffer[data_id] + self._cv_pool.pop(data_id) + self._fetch_buffer.pop(data_id) + _LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id)) + return ready_data def _pack_channeldata(self, rpc_request, data_id): dictdata = None try: dictdata = self._unpack_rpc_func(rpc_request) except Exception as e: - _LOGGER.error("parse RPC package to data[{}] Error: {}" - .format(data_id, e)) + _LOGGER.error( + "(logid={}) Failed to parse RPC request package: {}" + .format(data_id, e), + exc_info=True) return ChannelData( ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, error_info="rpc package error: {}".format(e), @@ -192,7 +211,7 @@ class DAGExecutor(object): profile_value = rpc_request.value[idx] break client_need_profile = (profile_value == self._client_profile_value) - _LOGGER.debug("request[{}] need profile: {}".format( + _LOGGER.debug("(logid={}) Need profile in client: {}".format( data_id, client_need_profile)) return ChannelData( datatype=ChannelDataType.DICT.value, @@ -201,60 +220,74 @@ class DAGExecutor(object): client_need_profile=client_need_profile) def call(self, rpc_request): - data_id = self._get_next_data_id() - _LOGGER.debug("generate id: {}".format(data_id)) + if self._tracer is not None: + trace_buffer = self._tracer.data_buffer() + data_id, cond_v = self._get_next_data_id() + _LOGGER.info("(logid={}) Succ generate id".format(data_id)) + + start_call, end_call = None, None if not self._is_thread_op: - self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id)) + start_call = self._profiler.record("call_{}#DAG-{}_0".format( + data_id, data_id)) else: - self._profiler.record("call_{}#DAG_0".format(data_id)) + start_call = self._profiler.record("call_{}#DAG_0".format(data_id)) - _LOGGER.debug("try parse RPC package to channeldata[{}]".format( - data_id)) + _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) req_channeldata = self._pack_channeldata(rpc_request, data_id) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) resp_channeldata = None for i in range(self._retry): - _LOGGER.debug("push data[{}] into Graph engine".format(data_id)) + _LOGGER.debug("(logid={}) Pushing data into Graph engine".format( + data_id)) try: self._in_channel.push(req_channeldata, self.name) except ChannelStopError: - _LOGGER.debug("[DAG Executor] channel stop.") + _LOGGER.debug("[DAG Executor] Stop") + with self._cv_for_cv_pool: + self._cv_pool.pop(data_id) return self._pack_for_rpc_resp( ChannelData( ecode=ChannelDataEcode.CLOSED_ERROR.value, error_info="dag closed.", data_id=data_id)) - _LOGGER.debug("wait for Graph engine for data[{}]...".format( - data_id)) - resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) + _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id)) + resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, + cond_v) if resp_channeldata.ecode == ChannelDataEcode.OK.value: - _LOGGER.debug("Graph engine predict data[{}] succ".format( - data_id)) + _LOGGER.info("(logid={}) Succ predict".format(data_id)) break else: - _LOGGER.warn("Graph engine predict data[{}] failed: {}" - .format(data_id, resp_channeldata.error_info)) + _LOGGER.error("(logid={}) Failed to predict: {}" + .format(data_id, resp_channeldata.error_info)) if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: break if i + 1 < self._retry: - _LOGGER.warn("retry({}/{}) data[{}]".format(i + 1, self._retry, - data_id)) + _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format( + data_id, i + 1, self._retry)) - _LOGGER.debug("unpack channeldata[{}] into RPC resp package".format( - data_id)) + _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) rpc_resp = self._pack_for_rpc_resp(resp_channeldata) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) if not self._is_thread_op: - self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id)) + end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id, + data_id)) else: - self._profiler.record("call_{}#DAG_1".format(data_id)) + end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) + + if self._tracer is not None: + if resp_channeldata.ecode == ChannelDataEcode.OK.value: + trace_buffer.put(("DAG", "call_{}".format(data_id), True, + end_call - start_call)) + else: + trace_buffer.put(("DAG", "call_{}".format(data_id), False, + end_call - start_call)) profile_str = self._profiler.gen_profile_str() if self._server_use_profile: @@ -272,23 +305,33 @@ class DAGExecutor(object): return rpc_resp def _pack_for_rpc_resp(self, channeldata): - return self._pack_rpc_func(channeldata) + try: + return self._pack_rpc_func(channeldata) + except Exception as e: + _LOGGER.error( + "(logid={}) Failed to pack RPC response package: {}" + .format(channeldata.id, e), + exc_info=True) + resp = pipeline_service_pb2.Response() + resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value + resp.error_info = "rpc package error: {}".format(e) + return resp class DAG(object): def __init__(self, request_name, response_op, use_profile, is_thread_op, - client_type, channel_size, show_info, build_dag_each_worker): + client_type, channel_size, build_dag_each_worker, tracer): self._request_name = request_name self._response_op = response_op self._use_profile = use_profile self._is_thread_op = is_thread_op self._channel_size = channel_size self._client_type = client_type - self._show_info = show_info self._build_dag_each_worker = build_dag_each_worker + self._tracer = tracer if not self._is_thread_op: self._manager = multiprocessing.Manager() - _LOGGER.info("[DAG] succ init") + _LOGGER.info("[DAG] Succ init") def get_use_ops(self, response_op): unique_names = set() @@ -308,8 +351,10 @@ class DAG(object): used_ops.add(pred_op) # check the name of op is globally unique if pred_op.name in unique_names: - raise Exception("the name of Op must be unique: {}". - format(pred_op.name)) + _LOGGER.critical("Failed to get used Ops: the" + " name of Op must be unique: {}". + format(pred_op.name)) + os._exit(-1) unique_names.add(pred_op.name) return used_ops, succ_ops_of_use_op @@ -321,12 +366,12 @@ class DAG(object): else: channel = ProcessChannel( self._manager, name=name_gen.next(), maxsize=self._channel_size) - _LOGGER.debug("[DAG] gen Channel: {}".format(channel.name)) + _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name)) return channel def _gen_virtual_op(self, name_gen): vir_op = VirtualOp(name=name_gen.next()) - _LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name)) + _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name)) return vir_op def _topo_sort(self, used_ops, response_op, out_degree_ops): @@ -341,7 +386,9 @@ class DAG(object): if len(op.get_input_ops()) == 0: zero_indegree_num += 1 if zero_indegree_num != 1: - raise Exception("DAG contains multiple input Ops") + _LOGGER.critical("Failed to topo sort: DAG contains " + "multiple RequestOps") + os._exit(-1) last_op = response_op.get_input_ops()[0] ques[que_idx].put(last_op) @@ -365,24 +412,28 @@ class DAG(object): break que_idx = (que_idx + 1) % 2 if sorted_op_num < len(used_ops): - raise Exception("not legal DAG") + _LOGGER.critical("Failed to topo sort: not legal DAG") + os._exit(-1) return dag_views, last_op def _build_dag(self, response_op): if response_op is None: - raise Exception("response_op has not been set.") + _LOGGER.critical("Failed to build DAG: ResponseOp" + " has not been set.") + os._exit(-1) used_ops, out_degree_ops = self.get_use_ops(response_op) - if self._show_info: + if not self._build_dag_each_worker: _LOGGER.info("================= USED OP =================") for op in used_ops: if op.name != self._request_name: _LOGGER.info(op.name) _LOGGER.info("-------------------------------------------") if len(used_ops) <= 1: - raise Exception( - "Besides RequestOp and ResponseOp, there should be at least one Op in DAG." - ) + _LOGGER.critical( + "Failed to build DAG: besides RequestOp and ResponseOp, " + "there should be at least one Op in DAG.") + os._exit(-1) if self._build_dag_each_worker: _LOGGER.info("Because `build_dag_each_worker` mode is used, " "Auto-batching is set to the default config: " @@ -393,15 +444,15 @@ class DAG(object): dag_views, last_op = self._topo_sort(used_ops, response_op, out_degree_ops) dag_views = list(reversed(dag_views)) - if self._show_info: - _LOGGER.info("================== DAG ====================") + if not self._build_dag_each_worker: + _LOGGER.debug("================== DAG ====================") for idx, view in enumerate(dag_views): - _LOGGER.info("(VIEW {})".format(idx)) + _LOGGER.debug("(VIEW {})".format(idx)) for op in view: - _LOGGER.info(" [{}]".format(op.name)) + _LOGGER.debug(" [{}]".format(op.name)) for out_op in out_degree_ops[op.name]: - _LOGGER.info(" - {}".format(out_op.name)) - _LOGGER.info("-------------------------------------------") + _LOGGER.debug(" - {}".format(out_op.name)) + _LOGGER.debug("-------------------------------------------") # create channels and virtual ops virtual_op_name_gen = NameGenerator("vir") @@ -443,8 +494,6 @@ class DAG(object): continue channel = self._gen_channel(channel_name_gen) channels.append(channel) - _LOGGER.debug("[DAG] Channel({}) => Op({})" - .format(channel.name, op.name)) op.add_input_channel(channel) pred_ops = pred_op_of_next_view_op[op.name] if v_idx == 0: @@ -452,8 +501,6 @@ class DAG(object): else: # if pred_op is virtual op, it will use ancestors as producers to channel for pred_op in pred_ops: - _LOGGER.debug("[DAG] Op({}) => Channel({})" - .format(pred_op.name, channel.name)) pred_op.add_output_channel(channel) processed_op.add(op.name) # find same input op to combine channel @@ -469,8 +516,6 @@ class DAG(object): same_flag = False break if same_flag: - _LOGGER.debug("[DAG] Channel({}) => Op({})" - .format(channel.name, other_op.name)) other_op.add_input_channel(channel) processed_op.add(other_op.name) output_channel = self._gen_channel(channel_name_gen) @@ -488,16 +533,19 @@ class DAG(object): actual_ops.append(op) for c in channels: - _LOGGER.debug("Channel({}):\n -producers: {}\n -consumers: {}" + _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}" .format(c.name, c.get_producers(), c.get_consumers())) return (actual_ops, channels, input_channel, output_channel, pack_func, unpack_func) + def get_channels(self): + return self._channels + def build(self): (actual_ops, channels, input_channel, output_channel, pack_func, unpack_func) = self._build_dag(self._response_op) - _LOGGER.info("[DAG] succ build dag") + _LOGGER.info("[DAG] Succ build DAG") self._actual_ops = actual_ops self._channels = channels @@ -506,12 +554,15 @@ class DAG(object): self._pack_func = pack_func self._unpack_func = unpack_func + self._tracer.set_channels(self._channels) + return self._input_channel, self._output_channel, self._pack_func, self._unpack_func def start(self): self._threads_or_proces = [] for op in self._actual_ops: op.use_profiler(self._use_profile) + op.set_tracer(self._tracer) if self._is_thread_op: self._threads_or_proces.extend( op.start_with_thread(self._client_type)) diff --git a/python/pipeline/logger.py b/python/pipeline/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..fd86b58cc9042b88dbaaf71ca2e132de330be8f1 --- /dev/null +++ b/python/pipeline/logger.py @@ -0,0 +1,90 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import logging.handlers +import os + + +class SectionLevelFilter(object): + def __init__(self, levels): + self._levels = levels + + def filter(self, logRecord): + return logRecord.levelno in self._levels + + +class OutOfMouduleFilter(object): + def __init__(self, out_names): + self._out_names = out_names + + def filter(self, logRecord): + return logRecord.name not in self._out_names + + +class OutOfMouduleAndSectionLevelFilter(object): + def __init__(self, out_names, levels): + self._out_names = out_names + self._levels = levels + + def filter(self, logRecord): + if logRecord.name in self._out_names: + return False + return logRecord.levelno in self._levels + + +class StreamHandler(logging.StreamHandler): + def __init__(self, *args, **kwargs): + super(StreamHandler, self).__init__(*args, **kwargs) + self.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) + + +log_dir = "PipelineServingLogs" +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# root logger +_LOGGER = logging.getLogger() +_LOGGER.setLevel(logging.DEBUG) + +formatter = logging.Formatter( + "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s") +# info and warn +file_info = logging.handlers.RotatingFileHandler( + os.path.join(log_dir, "INFO.log")) +file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) +file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING])) +file_info.setFormatter(formatter) + +# err and critical +file_err = logging.handlers.RotatingFileHandler( + os.path.join(log_dir, "ERROR.log")) +file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) +file_err.setLevel(logging.ERROR) +file_err.setFormatter(formatter) + +_LOGGER.addHandler(file_info) +_LOGGER.addHandler(file_err) + +# tracer logger +_TRACER = logging.getLogger("pipeline.profiler") +_TRACER.setLevel(logging.INFO) +_TRACER.addFilter(logging.Filter("pipeline.profiler")) + +# tracer +tracer_formatter = logging.Formatter("%(asctime)s %(message)s") +file_trace = logging.handlers.RotatingFileHandler( + os.path.join(log_dir, "TRACE.log")) +file_trace.setFormatter(tracer_formatter) +_TRACER.addHandler(file_trace) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index eea153d4beb019956e429af61af6e2e13476e0e0..323b4865cf1f4529a50e85b94723ecc09c0bd0b3 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -13,6 +13,7 @@ # limitations under the License. # pylint: disable=doc-string-missing from time import time as _time +import time import threading import multiprocessing from paddle_serving_client import MultiLangClient, Client @@ -31,7 +32,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, from .util import NameGenerator from .profiler import UnsafeTimeProfiler as TimeProfiler -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.operator") _op_name_gen = NameGenerator("Op") @@ -60,7 +61,10 @@ class Op(object): self._client_config = client_config self._fetch_names = fetch_list - self._timeout = timeout + if timeout > 0: + self._timeout = timeout / 1000.0 + else: + self._timeout = -1 self._retry = max(1, retry) self._input = None self._outputs = [] @@ -69,13 +73,34 @@ class Op(object): self._auto_batching_timeout = auto_batching_timeout if self._auto_batching_timeout is not None: if self._auto_batching_timeout <= 0 or self._batch_size == 1: + _LOGGER.warning( + self._log( + "Because auto_batching_timeout <= 0 or batch_size == 1," + " set auto_batching_timeout to None.")) self._auto_batching_timeout = None else: self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 + if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp): + _LOGGER.info( + self._log("\n\tinput_ops: {}," + "\n\tserver_endpoints: {}" + "\n\tfetch_list: {}" + "\n\tclient_config: {}" + "\n\tconcurrency: {}," + "\n\ttimeout(s): {}," + "\n\tretry: {}," + "\n\tbatch_size: {}," + "\n\tauto_batching_timeout(s): {}".format( + ", ".join([op.name for op in input_ops + ]), self._server_endpoints, + self._fetch_names, self._client_config, + self.concurrency, self._timeout, self._retry, + self._batch_size, self._auto_batching_timeout))) self._server_use_profile = False + self._tracer = None - # only for multithread + # only for thread op self._for_init_op_lock = threading.Lock() self._for_close_op_lock = threading.Lock() self._succ_init_op = False @@ -83,34 +108,35 @@ class Op(object): def use_default_auto_batching_config(self): if self._batch_size != 1: - _LOGGER.warn("Op({}) reset batch_size=1 (original: {})" - .format(self.name, self._batch_size)) + _LOGGER.warning("Op({}) reset batch_size=1 (original: {})" + .format(self.name, self._batch_size)) self._batch_size = 1 if self._auto_batching_timeout != None: - _LOGGER.warn("Op({}) reset auto_batching_timeout=1 (original: {})" - .format(self.name, self._auto_batching_timeout)) + _LOGGER.warning( + "Op({}) reset auto_batching_timeout=None (original: {})" + .format(self.name, self._auto_batching_timeout)) self._auto_batching_timeout = None def use_profiler(self, use_profile): self._server_use_profile = use_profile + def set_tracer(self, tracer): + self._tracer = tracer + def init_client(self, client_type, client_config, server_endpoints, fetch_names): if self.with_serving == False: - _LOGGER.info("Op({}) no client".format(self.name)) + _LOGGER.info("Op({}) has no client (and it also do not " + "run the process function".format(self.name)) return None - _LOGGER.info("Op({}) service endpoints: {}".format(self.name, - server_endpoints)) - _LOGGER.debug("Op({}) fetch_names: {}".format(self.name, fetch_names)) if client_type == 'brpc': - _LOGGER.debug("Op({}) client_config: {}".format(self.name, - client_config)) client = Client() client.load_client_config(client_config) elif client_type == 'grpc': client = MultiLangClient() else: - raise ValueError("unknow client type: {}".format(client_type)) + raise ValueError("Failed to init client: unknow client " + "type {}".format(client_type)) client.connect(server_endpoints) self._fetch_names = fetch_names return client @@ -124,16 +150,19 @@ class Op(object): self._input_ops = [] for op in ops: if not isinstance(op, Op): - raise TypeError( - self._log('input op must be Op type, not {}'.format( - type(op)))) + _LOGGER.critical( + self._log("Failed to set input_ops: input op " + "must be Op type, not {}".format(type(op)))) + os._exit(-1) self._input_ops.append(op) def add_input_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): - raise TypeError( - self._log('input channel must be Channel type, not {}'.format( - type(channel)))) + _LOGGER.critical( + self._log("Failed to set input_channel: input " + "channel must be Channel type, not {}".format( + type(channel)))) + os._exit(-1) channel.add_consumer(self.name) self._input = channel @@ -145,9 +174,10 @@ class Op(object): def add_output_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): - raise TypeError( - self._log('output channel must be Channel type, not {}'.format( - type(channel)))) + _LOGGER.critical( + self._log("Failed to add output_channel: output channel " + "must be Channel type, not {}".format(type(channel)))) + os._exit(-1) channel.add_producer(self.name) self._outputs.append(channel) @@ -160,9 +190,11 @@ class Op(object): def preprocess(self, input_dicts): # multiple previous Op if len(input_dicts) != 1: - raise NotImplementedError( - 'this Op has multiple previous inputs. Please override this func.' - ) + _LOGGER.critical( + self._log( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.")) + os._exit(-1) (_, input_dict), = input_dicts.items() return input_dict @@ -170,10 +202,16 @@ class Op(object): def process(self, feed_batch): err, err_info = ChannelData.check_batch_npdata(feed_batch) if err != 0: - raise NotImplementedError( - "{} Please override preprocess func.".format(err_info)) + _LOGGER.critical( + self._log("Failed to run process: {}. Please override " + "preprocess func.".format(err_info))) + os._exit(-1) call_result = self.client.predict( feed=feed_batch, fetch=self._fetch_names) + if isinstance(self.client, MultiLangClient): + if call_result is None or call_result["serving_status_code"] != 0: + return None + call_result.pop("serving_status_code") return call_result def postprocess(self, input_dict, fetch_dict): @@ -218,23 +256,35 @@ class Op(object): channel.push(data, name) def start_with_process(self, client_type): + trace_buffer = None + if self._tracer is not None: + trace_buffer = self._tracer.data_buffer() proces = [] for concurrency_idx in range(self.concurrency): p = multiprocessing.Process( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type, False)) + self._get_output_channels(), client_type, False, + trace_buffer)) + p.daemon = True p.start() proces.append(p) return proces def start_with_thread(self, client_type): + trace_buffer = None + if self._tracer is not None: + trace_buffer = self._tracer.data_buffer() threads = [] for concurrency_idx in range(self.concurrency): t = threading.Thread( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type, True)) + self._get_output_channels(), client_type, True, + trace_buffer)) + # When a process exits, it attempts to terminate + # all of its daemonic child processes. + t.daemon = True t.start() threads.append(t) return threads @@ -242,35 +292,27 @@ class Op(object): def init_op(self): pass - def _run_preprocess(self, parsed_data_dict, log_func): - _LOGGER.debug(log_func("try to run preprocess")) + def _run_preprocess(self, parsed_data_dict, op_info_prefix): + _LOGGER.debug("{} Running preprocess".format(op_info_prefix)) preped_data_dict = {} err_channeldata_dict = {} for data_id, parsed_data in parsed_data_dict.items(): preped_data, error_channeldata = None, None try: preped_data = self.preprocess(parsed_data) - except NotImplementedError as e: - # preprocess function not implemented - error_info = log_func("preprocess data[{}] failed: {}".format( - data_id, e)) - error_channeldata = ChannelData( - ecode=ChannelDataEcode.NOT_IMPLEMENTED.value, - error_info=error_info, - data_id=data_id) except TypeError as e: # Error type in channeldata.datatype - error_info = log_func("preprocess data[{}] failed: {}".format( - data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to preprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) error_channeldata = ChannelData( ecode=ChannelDataEcode.TYPE_ERROR.value, error_info=error_info, data_id=data_id) except Exception as e: - error_info = log_func("preprocess data[{}] failed: {}".format( - data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to preprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) error_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, error_info=error_info, @@ -279,11 +321,11 @@ class Op(object): err_channeldata_dict[data_id] = error_channeldata else: preped_data_dict[data_id] = preped_data - _LOGGER.debug(log_func("succ run preprocess")) + _LOGGER.debug("{} Succ preprocess".format(op_info_prefix)) return preped_data_dict, err_channeldata_dict - def _run_process(self, preped_data_dict, log_func): - _LOGGER.debug(log_func("try to run process")) + def _run_process(self, preped_data_dict, op_info_prefix): + _LOGGER.debug("{} Running process".format(op_info_prefix)) midped_data_dict = {} err_channeldata_dict = {} if self.with_serving: @@ -296,8 +338,9 @@ class Op(object): midped_batch = self.process(feed_batch) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = log_func("process batch failed: {}".format(e)) - _LOGGER.error(error_info) + error_info = "{} Failed to process(batch: {}): {}".format( + op_info_prefix, data_ids, e) + _LOGGER.error(error_info, exc_info=True) else: for i in range(self._retry): try: @@ -306,30 +349,34 @@ class Op(object): except func_timeout.FunctionTimedOut as e: if i + 1 >= self._retry: ecode = ChannelDataEcode.TIMEOUT.value - error_info = log_func(e) + error_info = "{} Failed to process(batch: {}): " \ + "exceeded retry count.".format( + op_info_prefix, data_ids) _LOGGER.error(error_info) else: - _LOGGER.warn( - log_func("timeout, retry({}/{})" - .format(i + 1, self._retry))) + _LOGGER.warning( + "{} Failed to process(batch: {}): timeout, and retrying({}/{})" + .format(op_info_prefix, data_ids, i + 1, + self._retry)) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value - error_info = log_func("process batch failed: {}".format( - e)) - _LOGGER.error(error_info) + error_info = "{} Failed to process(batch: {}): {}".format( + op_info_prefix, data_ids, e) + _LOGGER.error(error_info, exc_info=True) break else: break if ecode != ChannelDataEcode.OK.value: for data_id in data_ids: + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ecode, error_info=error_info, data_id=data_id) elif midped_batch is None: # op client return None - error_info = log_func( - "predict failed. pls check the server side.") - _LOGGER.error(error_info) + error_info = "{} Failed to predict, please check if PaddleServingService" \ + " is working properly.".format(op_info_prefix) for data_id in data_ids: + _LOGGER.error("(logid={}) {}".format(data_id, error_info)) err_channeldata_dict[data_id] = ChannelData( ecode=ChannelDataEcode.CLIENT_ERROR.value, error_info=error_info, @@ -343,11 +390,12 @@ class Op(object): } else: midped_data_dict = preped_data_dict - _LOGGER.debug(log_func("succ run process")) + _LOGGER.debug("{} Succ process".format(op_info_prefix)) return midped_data_dict, err_channeldata_dict - def _run_postprocess(self, parsed_data_dict, midped_data_dict, log_func): - _LOGGER.debug(log_func("try to run postprocess")) + def _run_postprocess(self, parsed_data_dict, midped_data_dict, + op_info_prefix): + _LOGGER.debug("{} Running postprocess".format(op_info_prefix)) postped_data_dict = {} err_channeldata_dict = {} for data_id, midped_data in midped_data_dict.items(): @@ -356,9 +404,9 @@ class Op(object): postped_data = self.postprocess(parsed_data_dict[data_id], midped_data) except Exception as e: - error_info = log_func("postprocess data[{}] failed: {}" - .format(data_id, e)) - _LOGGER.error(error_info) + error_info = "(logid={}) {} Failed to postprocess: {}".format( + data_id, op_info_prefix, e) + _LOGGER.error(error_info, exc_info=True) err_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, error_info=error_info, @@ -368,8 +416,11 @@ class Op(object): continue else: if not isinstance(postped_data, dict): - error_info = log_func("output of postprocess funticon must be " \ - "dict type, but get {}".format(type(postped_data))) + error_info = "(logid={}) {} Failed to postprocess: " \ + "output of postprocess funticon must be " \ + "dict type, but get {}".format( + data_id, op_info_prefix, + type(postped_data)) _LOGGER.error(error_info) err_channeldata = ChannelData( ecode=ChannelDataEcode.UNKNOW.value, @@ -391,16 +442,13 @@ class Op(object): dictdata=postped_data, data_id=data_id) postped_data_dict[data_id] = output_data - _LOGGER.debug(log_func("succ run postprocess")) + _LOGGER.debug("{} Succ postprocess".format(op_info_prefix)) return postped_data_dict, err_channeldata_dict def _auto_batching_generator(self, input_channel, op_name, batch_size, - timeout, log_func): + timeout, op_info_prefix): while True: batch = [] - _LOGGER.debug( - log_func("Auto-batching expect size: {}; timeout: {}".format( - batch_size, timeout))) while len(batch) == 0: endtime = None if timeout is not None: @@ -411,7 +459,8 @@ class Op(object): if timeout is not None: remaining = endtime - _time() if remaining <= 0.0: - _LOGGER.debug(log_func("Auto-batching timeout")) + _LOGGER.debug("{} Failed to generate batch: " + "timeout".format(op_info_prefix)) break channeldata_dict = input_channel.front(op_name, timeout) @@ -419,10 +468,11 @@ class Op(object): channeldata_dict = input_channel.front(op_name) batch.append(channeldata_dict) except ChannelTimeoutError: - _LOGGER.debug(log_func("Auto-batching timeout")) + _LOGGER.debug("{} Failed to generate batch: " + "timeout".format(op_info_prefix)) break - _LOGGER.debug( - log_func("Auto-batching actual size: {}".format(len(batch)))) + _LOGGER.debug("{} Got actual batch_size: {}".format(op_info_prefix, + len(batch))) yield batch def _parse_channeldata_batch(self, batch, output_channels): @@ -446,15 +496,8 @@ class Op(object): return parsed_data_dict, need_profile_dict, profile_dict def _run(self, concurrency_idx, input_channel, output_channels, client_type, - is_thread_op): - def get_log_func(op_info_prefix): - def log_func(info_str): - return "{} {}".format(op_info_prefix, info_str) - - return log_func - + is_thread_op, trace_buffer): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) - log = get_log_func(op_info_prefix) tid = threading.current_thread().ident # init op @@ -463,24 +506,31 @@ class Op(object): profiler = self._initialize(is_thread_op, client_type, concurrency_idx) except Exception as e: - _LOGGER.error(log("init op failed: {}".format(e))) + _LOGGER.critical( + "{} Failed to init op: {}".format(op_info_prefix, e), + exc_info=True) os._exit(-1) - _LOGGER.info(log("succ init")) + _LOGGER.info("{} Succ init".format(op_info_prefix)) batch_generator = self._auto_batching_generator( input_channel=input_channel, op_name=self.name, batch_size=self._batch_size, timeout=self._auto_batching_timeout, - log_func=log) + op_info_prefix=op_info_prefix) + start, end = None, None while True: + start = int(round(_time() * 1000000)) try: channeldata_dict_batch = next(batch_generator) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break + end = int(round(_time() * 1000000)) + if trace_buffer is not None: + trace_buffer.put((self.name, "in", True, end - start)) # parse channeldata batch try: @@ -488,7 +538,7 @@ class Op(object): = self._parse_channeldata_batch( channeldata_dict_batch, output_channels) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(parsed_data_dict) == 0: @@ -496,10 +546,12 @@ class Op(object): continue # preprecess - profiler.record("prep#{}_0".format(op_info_prefix)) + start = profiler.record("prep#{}_0".format(op_info_prefix)) preped_data_dict, err_channeldata_dict \ - = self._run_preprocess(parsed_data_dict, log) - profiler.record("prep#{}_1".format(op_info_prefix)) + = self._run_preprocess(parsed_data_dict, op_info_prefix) + end = profiler.record("prep#{}_1".format(op_info_prefix)) + if trace_buffer is not None: + trace_buffer.put((self.name, "prep", True, end - start)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -508,17 +560,19 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(parsed_data_dict) == 0: continue # process - profiler.record("midp#{}_0".format(op_info_prefix)) + start = profiler.record("midp#{}_0".format(op_info_prefix)) midped_data_dict, err_channeldata_dict \ - = self._run_process(preped_data_dict, log) - profiler.record("midp#{}_1".format(op_info_prefix)) + = self._run_process(preped_data_dict, op_info_prefix) + end = profiler.record("midp#{}_1".format(op_info_prefix)) + if trace_buffer is not None: + trace_buffer.put((self.name, "midp", True, end - start)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -527,18 +581,20 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(midped_data_dict) == 0: continue # postprocess - profiler.record("postp#{}_0".format(op_info_prefix)) + start = profiler.record("postp#{}_0".format(op_info_prefix)) postped_data_dict, err_channeldata_dict \ = self._run_postprocess( - parsed_data_dict, midped_data_dict, log) - profiler.record("postp#{}_1".format(op_info_prefix)) + parsed_data_dict, midped_data_dict, op_info_prefix) + end = profiler.record("postp#{}_1".format(op_info_prefix)) + if trace_buffer is not None: + trace_buffer.put((self.name, "postp", True, end - start)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -547,13 +603,14 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break if len(postped_data_dict) == 0: continue # push data to channel (if run succ) + start = int(round(_time() * 1000000)) try: profile_str = profiler.gen_profile_str() for data_id, postped_data in postped_data_dict.items(): @@ -566,9 +623,12 @@ class Op(object): client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: - _LOGGER.debug(log("channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break + end = int(round(_time() * 1000000)) + if trace_buffer is not None: + trace_buffer.put((self.name, "out", True, end - start)) def _initialize(self, is_thread_op, client_type, concurrency_idx): if is_thread_op: @@ -615,13 +675,13 @@ class RequestOp(Op): """ RequestOp do not run preprocess, process, postprocess. """ def __init__(self): - # PipelineService.name = "@G" - super(RequestOp, self).__init__(name="@G", input_ops=[]) + # PipelineService.name = "@DAGExecutor" + super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) # init op try: self.init_op() except Exception as e: - _LOGGER.error("Op(Request) init op failed: {}".format(e)) + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) os._exit(-1) def unpack_request_package(self, request): @@ -640,12 +700,14 @@ class ResponseOp(Op): """ ResponseOp do not run preprocess, process, postprocess. """ def __init__(self, input_ops): - super(ResponseOp, self).__init__(name="@R", input_ops=input_ops) + super(ResponseOp, self).__init__( + name="@DAGExecutor", input_ops=input_ops) # init op try: self.init_op() except Exception as e: - _LOGGER.error("Op(ResponseOp) init op failed: {}".format(e)) + _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format( + e, exc_info=True)) os._exit(-1) def pack_response_package(self, channeldata): @@ -668,14 +730,19 @@ class ResponseOp(Op): resp.error_info = self._log( "fetch var type must be str({}).".format( type(var))) + _LOGGER.error("(logid={}) Failed to pack RPC " + "response package: {}".format( + channeldata.id, resp.error_info)) break resp.value.append(var) resp.key.append(name) else: resp.ecode = ChannelDataEcode.TYPE_ERROR.value resp.error_info = self._log( - "Error type({}) in datatype.".format(channeldata.datatype)) - _LOGGER.error(resp.error_info) + "error type({}) in datatype.".format(channeldata.datatype)) + _LOGGER.error("(logid={}) Failed to pack RPC response" + " package: {}".format(channeldata.id, + resp.error_info)) else: resp.error_info = channeldata.error_info return resp @@ -693,6 +760,7 @@ class VirtualOp(Op): self._virtual_pred_ops.append(op) def _actual_pred_op_names(self, op): + # can use disjoint-set, but it's not necessary if not isinstance(op, VirtualOp): return [op.name] names = [] @@ -702,9 +770,11 @@ class VirtualOp(Op): def add_output_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): - raise TypeError( - self._log('output channel must be Channel type, not {}'.format( - type(channel)))) + _LOGGER.critical( + self._log("Failed to add output_channel: output_channel" + " must be Channel type, not {}".format( + type(channel)))) + os._exit(-1) for op in self._virtual_pred_ops: for op_name in self._actual_pred_op_names(op): channel.add_producer(op_name) @@ -712,27 +782,31 @@ class VirtualOp(Op): def _run(self, concurrency_idx, input_channel, output_channels, client_type, is_thread_op): - def get_log_func(op_info_prefix): - def log_func(info_str): - return "{} {}".format(op_info_prefix, info_str) - - return log_func - op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = get_log_func(op_info_prefix) tid = threading.current_thread().ident + batch_generator = self._auto_batching_generator( + input_channel=input_channel, + op_name=self.name, + batch_size=1, + timeout=None, + log_func=log) + while True: try: - channeldata_dict = input_channel.front(self.name) + channeldata_dict_batch = next(batch_generator) except ChannelStopError: - _LOGGER.debug(log("Channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) + self._finalize(is_thread_op) break try: - for name, data in channeldata_dict.items(): - self._push_to_output_channels( - data, channels=output_channels, name=name) + for channeldata_dict in channeldata_dict_batch: + for name, data in channeldata_dict.items(): + self._push_to_output_channels( + data, channels=output_channels, name=name) except ChannelStopError: - _LOGGER.debug(log("Channel stop.")) + _LOGGER.debug("{} Stop.".format(op_info_prefix)) + self._finalize(is_thread_op) break diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index 405c3625ddc55e2965235a26906f08ea7fbfcf37..0b4226dafbddc95bedf9d638c73e0600e78a4184 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -22,7 +22,7 @@ from .channel import ChannelDataEcode from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2_grpc -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.pipeline_client") class PipelineClient(object): diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 4a352ff87e497c795db8177f4cef8b33b03801f2..d17afde7e88c18a535762d8fbe299fdc4611676f 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -15,6 +15,7 @@ from concurrent import futures import grpc import logging +import json import socket import contextlib from contextlib import closing @@ -25,15 +26,14 @@ from .proto import pipeline_service_pb2_grpc from .operator import ResponseOp from .dag import DAGExecutor -_LOGGER = logging.getLogger() +_LOGGER = logging.getLogger("pipeline.pipeline_server") class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): - def __init__(self, response_op, dag_config, show_info): + def __init__(self, response_op, dag_conf): super(PipelineServicer, self).__init__() # init dag executor - self._dag_executor = DAGExecutor( - response_op, dag_config, show_info=show_info) + self._dag_executor = DAGExecutor(response_op, dag_conf) self._dag_executor.start() _LOGGER.info("[PipelineServicer] succ init") @@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): resp = self._dag_executor.call(request) return resp - def __del__(self): - self._dag_executor.stop() - @contextlib.contextmanager def _reserve_port(port): @@ -67,9 +64,11 @@ class PipelineServer(object): def set_response_op(self, response_op): if not isinstance(response_op, ResponseOp): - raise Exception("response_op must be ResponseOp type.") + raise Exception("Failed to set response_op: response_op " + "must be ResponseOp type.") if len(response_op.get_input_ops()) != 1: - raise Exception("response_op can only have one previous op.") + raise Exception("Failed to set response_op: response_op " + "can only have one previous op.") self._response_op = response_op def _port_is_available(self, port): @@ -79,36 +78,25 @@ class PipelineServer(object): return result != 0 def prepare_server(self, yml_file): - with open(yml_file) as f: - yml_config = yaml.load(f.read()) - default_config = { - "port": 9292, - "worker_num": 1, - "build_dag_each_worker": False, - } + conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file) - for key, val in default_config.items(): - if yml_config.get(key) is None: - _LOGGER.warning("[CONF] {} not set, use default: {}" - .format(key, val)) - yml_config[key] = val - - self._port = yml_config["port"] + self._port = conf["port"] if not self._port_is_available(self._port): - raise SystemExit("Prot {} is already used".format(self._port)) - self._worker_num = yml_config["worker_num"] - self._build_dag_each_worker = yml_config["build_dag_each_worker"] + raise SystemExit("Failed to prepare_server: prot {} " + "is already used".format(self._port)) + self._worker_num = conf["worker_num"] + self._build_dag_each_worker = conf["build_dag_each_worker"] _LOGGER.info("============= PIPELINE SERVER =============") - for key in default_config.keys(): - _LOGGER.info("{}: {}".format(key, yml_config[key])) + _LOGGER.info("\n{}".format( + json.dumps( + conf, indent=4, separators=(',', ':')))) if self._build_dag_each_worker is True: _LOGGER.info( "(Make sure that install grpcio whl with --no-binary flag)") _LOGGER.info("-------------------------------------------") - self._dag_config = yml_config.get("dag", {}) - self._dag_config["build_dag_each_worker"] = self._build_dag_each_worker + self._conf = conf def run_server(self): if self._build_dag_each_worker: @@ -119,8 +107,7 @@ class PipelineServer(object): show_info = (i == 0) worker = multiprocessing.Process( target=self._run_server_func, - args=(bind_address, self._response_op, - self._dag_config)) + args=(bind_address, self._response_op, self._conf)) worker.start() workers.append(worker) for worker in workers: @@ -129,19 +116,153 @@ class PipelineServer(object): server = grpc.server( futures.ThreadPoolExecutor(max_workers=self._worker_num)) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(self._response_op, self._dag_config, True), - server) + PipelineServicer(self._response_op, self._conf), server) server.add_insecure_port('[::]:{}'.format(self._port)) server.start() server.wait_for_termination() - def _run_server_func(self, bind_address, response_op, dag_config): + def _run_server_func(self, bind_address, response_op, dag_conf): options = (('grpc.so_reuseport', 1), ) server = grpc.server( futures.ThreadPoolExecutor( max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(response_op, dag_config, False), server) + PipelineServicer(response_op, dag_conf), server) server.add_insecure_port(bind_address) server.start() server.wait_for_termination() + + +class ServerYamlConfChecker(object): + def __init__(self): + pass + + @staticmethod + def load_server_yaml_conf(yml_file): + with open(yml_file) as f: + conf = yaml.load(f.read()) + ServerYamlConfChecker.check_server_conf(conf) + ServerYamlConfChecker.check_dag_conf(conf["dag"]) + ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"]) + return conf + + @staticmethod + def check_conf(conf, default_conf, conf_type, conf_qualification): + ServerYamlConfChecker.fill_with_default_conf(conf, default_conf) + ServerYamlConfChecker.check_conf_type(conf, conf_type) + ServerYamlConfChecker.check_conf_qualification(conf, conf_qualification) + + @staticmethod + def check_server_conf(conf): + default_conf = { + "port": 9292, + "worker_num": 1, + "build_dag_each_worker": False, + "dag": {}, + } + + conf_type = { + "port": int, + "worker_num": int, + "build_dag_each_worker": bool, + } + + conf_qualification = { + "port": [(">=", 1024), ("<=", 65535)], + "worker_num": (">=", 1), + } + + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + + @staticmethod + def check_tracer_conf(conf): + default_conf = {"interval_s": 600, } + + conf_type = {"interval_s": int, } + + conf_qualification = {} + + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + + @staticmethod + def check_dag_conf(conf): + default_conf = { + "retry": 1, + "client_type": "brpc", + "use_profile": False, + "channel_size": 0, + "is_thread_op": True, + "tracer": {}, + } + + conf_type = { + "retry": int, + "client_type": str, + "use_profile": bool, + "channel_size": int, + "is_thread_op": bool, + } + + conf_qualification = { + "retry": (">=", 1), + "client_type": ("in", ["brpc", "grpc"]), + "channel_size": (">=", 0), + } + + ServerYamlConfChecker.check_conf(conf, default_conf, conf_type, + conf_qualification) + + @staticmethod + def fill_with_default_conf(conf, default_conf): + for key, val in default_conf.items(): + if conf.get(key) is None: + _LOGGER.warning("[CONF] {} not set, use default: {}" + .format(key, val)) + conf[key] = val + + @staticmethod + def check_conf_type(conf, conf_type): + for key, val in conf_type.items(): + if not isinstance(conf[key], val): + raise SystemExit("[CONF] {} must be {} type, but get {}." + .format(key, val, type(conf[key]))) + + @staticmethod + def check_conf_qualification(conf, conf_qualification): + for key, qualification in conf_qualification.items(): + if not isinstance(qualification, list): + qualification = [qualification] + if not ServerYamlConfChecker.qualification_check(conf[key], + qualification): + raise SystemExit("[CONF] {} must be {}, but get {}." + .format(key, ", ".join([ + "{} {}" + .format(q[0], q[1]) for q in qualification + ]), conf[key])) + + @staticmethod + def qualification_check(value, qualifications): + if not isinstance(qualifications, list): + qualifications = [qualifications] + ok = True + for q in qualifications: + operator, limit = q + if operator == "<": + ok = value < limit + elif operator == "==": + ok = value == limit + elif operator == ">": + ok = value > limit + elif operator == "<=": + ok = value <= limit + elif operator == ">=": + ok = value >= limit + elif operator == "in": + ok = value in limit + else: + raise SystemExit("unknow operator: {}".format(operator)) + if ok == False: + break + return ok diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 93d37e23dc37751e19f2f6011982b1e5de2c77e6..6d7f2b63f0a22b00af712c913922c01154c1b008 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -23,12 +23,124 @@ elif sys.version_info.major == 3: else: raise Exception("Error Python version") from time import time as _time +import time import threading +import multiprocessing -_LOGGER = logging.getLogger() +_TRACER = logging.getLogger("pipeline.profiler") + + +class PerformanceTracer(object): + def __init__(self, is_thread_mode, interval_s, server_worker_num): + self._is_thread_mode = is_thread_mode + if is_thread_mode: + # Because the Channel in the thread mode cannot be + # accessed across processes, when using thread mode, + # the PerformanceTracer is also the thread mode. + # However, performance may be affected by GIL. + self._data_buffer = Queue.Queue() + else: + self._data_buffer = multiprocessing.Manager().Queue() + self._interval_s = interval_s + self._thrd = None + self._proc = None + self._channels = [] + # The size of data in Channel will not exceed server_worker_num + self._server_worker_num = server_worker_num + + def data_buffer(self): + return self._data_buffer + + def start(self): + if self._is_thread_mode: + self._thrd = threading.Thread( + target=self._trace_func, args=(self._channels, )) + self._thrd.daemon = True + self._thrd.start() + else: + self._proc = multiprocessing.Process( + target=self._trace_func, args=(self._channels, )) + self._proc.daemon = True + self._proc.start() + + def set_channels(self, channels): + self._channels = channels + + def _trace_func(self, channels): + actions = ["in", "prep", "midp", "postp", "out"] + calcu_actions = ["prep", "midp", "postp"] + while True: + op_cost = {} + err_count = 0 + + _TRACER.info("==================== TRACER ======================") + # op + while True: + try: + name, action, stage, cost = self._data_buffer.get_nowait() + if stage == False: + # only for name == DAG + assert name == "DAG" + err_count += 1 + if name not in op_cost: + op_cost[name] = {} + if action not in op_cost[name]: + op_cost[name][action] = [] + op_cost[name][action].append(cost) + except Queue.Empty: + break + + if len(op_cost) != 0: + for name in op_cost: + tot_cost, calcu_cost = 0.0, 0.0 + for action, costs in op_cost[name].items(): + op_cost[name][action] = sum(costs) / (1e3 * len(costs)) + tot_cost += op_cost[name][action] + + if name != "DAG": + _TRACER.info("Op({}):".format(name)) + for action in actions: + if action in op_cost[name]: + _TRACER.info("\t{}[{} ms]".format( + action, op_cost[name][action])) + for action in calcu_actions: + if action in op_cost[name]: + calcu_cost += op_cost[name][action] + _TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / + tot_cost)) + + if "DAG" in op_cost: + calls = op_cost["DAG"].values() + calls.sort() + tot = len(calls) + qps = 1.0 * tot / self._interval_s + ave_cost = sum(calls) / tot + latencys = [50, 60, 70, 80, 90, 95, 99] + _TRACER.info("DAGExecutor:") + _TRACER.info("\tquery count[{}]".format(tot)) + _TRACER.info("\tqps[{} q/s]".format(qps)) + _TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) + _TRACER.info("\tlatency:") + _TRACER.info("\t\tave[{} ms]".format(ave_cost)) + for latency in latencys: + _TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int( + tot * latency / 100.0)])) + + # channel + _TRACER.info("Channel (server worker num[{}]):".format( + self._server_worker_num)) + for channel in channels: + _TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format( + channel.name, + channel.get_producers(), + channel.get_consumers(), + channel.size(), channel.get_maxsize())) + time.sleep(self._interval_s) class UnsafeTimeProfiler(object): + """ thread unsafe profiler """ + def __init__(self): self.pid = os.getpid() self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid) @@ -41,8 +153,9 @@ class UnsafeTimeProfiler(object): def record(self, name): if self._enable is False: return - self.time_record.append('{}:{} '.format(name, - int(round(_time() * 1000000)))) + timestamp = int(round(_time() * 1000000)) + self.time_record.append('{}:{} '.format(name, timestamp)) + return timestamp def print_profile(self): if self._enable is False: @@ -78,6 +191,7 @@ class TimeProfiler(object): name = '_'.join(name_with_tag[:-1]) with self._lock: self._time_record.put((name, tag, timestamp)) + return timestamp def print_profile(self): if self._enable is False: diff --git a/python/requirements.txt b/python/requirements.txt index 5f5cfdc52464d5c9dc9ad40ec11be72c86dc6b2c..697b24fd4db6aff6b30913d8a5d23416dc208c80 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,6 +1,10 @@ numpy>=1.12, <=1.16.4 ; python_version<"3.5" +google>=2.0.3 protobuf>=3.12.2 grpcio-tools>=1.28.1 grpcio>=1.28.1 func-timeout>=4.3.5 pyyaml>=1.3.0 +sentencepiece==0.1.92 +flask>=1.1.2 +ujson>=2.0.3 diff --git a/tools/serving_build.sh b/tools/serving_build.sh index 4bb68d938bafaa0a0ac8641284b66024e6b38d6a..c54631a733fecc532f22d3ce1793ff8554e21f7d 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -54,7 +54,6 @@ function build_app() { local DIRNAME=build-app-$TYPE mkdir $DIRNAME # pwd: /Serving cd $DIRNAME # pwd: /Serving/build-app-$TYPE - pip install numpy sentencepiece case $TYPE in CPU|GPU) cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ @@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() { function python_test_bert() { # pwd: /Serving/python/examples local TYPE=$1 - yum install -y libXext libSM libXrender >/dev/null - pip install ujson export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving cd bert # pwd: /Serving/python/examples/bert case $TYPE in @@ -779,7 +776,7 @@ function python_test_pipeline(){ # test: thread servicer & thread op cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: true @@ -796,7 +793,7 @@ EOF # test: thread servicer & process op cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: false @@ -810,13 +807,13 @@ EOF ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 - # test: process servicer & thread op + # test: process servicer & process op cat << EOF > config.yml port: 18080 -worker_num: 2 -build_dag_each_worker: true +worker_num: 4 +build_dag_each_worker: false dag: - is_thread_op: flase + is_thread_op: false client_type: brpc retry: 1 use_profile: false @@ -826,12 +823,14 @@ EOF check_cmd "python test_pipeline_client.py" ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 - - # test: process servicer & process op + + # test: process servicer & thread op + pip uninstall grpcio -y + pip install grpcio --no-binary=grpcio cat << EOF > config.yml port: 18080 -worker_num: 2 -build_dag_each_worker: false +worker_num: 4 +build_dag_each_worker: true dag: is_thread_op: false client_type: brpc @@ -843,7 +842,7 @@ EOF check_cmd "python test_pipeline_client.py" ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 - + kill_server_process kill_process_by_port 9292 kill_process_by_port 9393 @@ -854,7 +853,7 @@ EOF sleep 5 cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: false