From ed63951118f2800f4ce07b9c0f5ee2fe48ea4ace Mon Sep 17 00:00:00 2001 From: barriery Date: Thu, 6 Aug 2020 03:56:48 +0000 Subject: [PATCH] update code structure of logger module --- python/pipeline/channel.py | 2 +- python/pipeline/dag.py | 2 +- python/pipeline/logger.py | 113 ++++++++++++++--------------- python/pipeline/operator.py | 2 +- python/pipeline/pipeline_client.py | 2 +- python/pipeline/pipeline_server.py | 2 +- python/pipeline/profiler.py | 29 ++++---- 7 files changed, 73 insertions(+), 79 deletions(-) diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 08c8936e..cfbe9c22 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -29,7 +29,7 @@ import enum import os import copy -_LOGGER = logging.getLogger("pipeline.channel") +_LOGGER = logging.getLogger(__name__) class ChannelDataEcode(enum.Enum): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 3a9e8c6f..9bc8a07f 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -32,7 +32,7 @@ from .profiler import TimeProfiler, PerformanceTracer from .util import NameGenerator from .proto import pipeline_service_pb2 -_LOGGER = logging.getLogger("pipeline.dag") +_LOGGER = logging.getLogger(__name__) class DAGExecutor(object): diff --git a/python/pipeline/logger.py b/python/pipeline/logger.py index fd86b58c..4d261e3f 100644 --- a/python/pipeline/logger.py +++ b/python/pipeline/logger.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -import logging.handlers +import logging.config import os @@ -25,66 +25,59 @@ class SectionLevelFilter(object): return logRecord.levelno in self._levels -class OutOfMouduleFilter(object): - def __init__(self, out_names): - self._out_names = out_names - - def filter(self, logRecord): - return logRecord.name not in self._out_names - - -class OutOfMouduleAndSectionLevelFilter(object): - def __init__(self, out_names, levels): - self._out_names = out_names - self._levels = levels - - def filter(self, logRecord): - if logRecord.name in self._out_names: - return False - return logRecord.levelno in self._levels - - -class StreamHandler(logging.StreamHandler): - def __init__(self, *args, **kwargs): - super(StreamHandler, self).__init__(*args, **kwargs) - self.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) - - log_dir = "PipelineServingLogs" if not os.path.exists(log_dir): os.makedirs(log_dir) -# root logger -_LOGGER = logging.getLogger() -_LOGGER.setLevel(logging.DEBUG) - -formatter = logging.Formatter( - "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s") -# info and warn -file_info = logging.handlers.RotatingFileHandler( - os.path.join(log_dir, "INFO.log")) -file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) -file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING])) -file_info.setFormatter(formatter) - -# err and critical -file_err = logging.handlers.RotatingFileHandler( - os.path.join(log_dir, "ERROR.log")) -file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"])) -file_err.setLevel(logging.ERROR) -file_err.setFormatter(formatter) - -_LOGGER.addHandler(file_info) -_LOGGER.addHandler(file_err) - -# tracer logger -_TRACER = logging.getLogger("pipeline.profiler") -_TRACER.setLevel(logging.INFO) -_TRACER.addFilter(logging.Filter("pipeline.profiler")) - -# tracer -tracer_formatter = logging.Formatter("%(asctime)s %(message)s") -file_trace = logging.handlers.RotatingFileHandler( - os.path.join(log_dir, "TRACE.log")) -file_trace.setFormatter(tracer_formatter) -_TRACER.addHandler(file_trace) +logger_config = { + "version": 1, + "formatters": { + "normal_fmt": { + "format": + "%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s", + }, + "tracer_fmt": { + "format": "%(asctime)s %(message)s", + }, + }, + "filters": { + "info_only_filter": { + "()": SectionLevelFilter, + "levels": [logging.INFO], + }, + }, + "handlers": { + "f_pipeline.log": { + "class": "logging.FileHandler", + "level": "INFO", + "formatter": "normal_fmt", + "filters": ["info_only_filter"], + "filename": os.path.join(log_dir, "pipeline.log"), + }, + "f_pipeline.log.wf": { + "class": "logging.FileHandler", + "level": "WARNING", + "formatter": "normal_fmt", + "filename": os.path.join(log_dir, "pipeline.log.wf"), + }, + "f_tracer.log": { + "class": "logging.FileHandler", + "level": "INFO", + "formatter": "tracer_fmt", + "filename": os.path.join(log_dir, "pipeline.tracer"), + }, + }, + "loggers": { + # propagate = True + ".".join(__name__.split(".")[:-1] + ["profiler"]): { + "level": "INFO", + "handlers": ["f_tracer.log"], + }, + }, + "root": { + "level": "DEBUG", + "handlers": ["f_pipeline.log", "f_pipeline.log.wf"], + }, +} + +logging.config.dictConfig(logger_config) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 323b4865..67dfc6a6 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -32,7 +32,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, from .util import NameGenerator from .profiler import UnsafeTimeProfiler as TimeProfiler -_LOGGER = logging.getLogger("pipeline.operator") +_LOGGER = logging.getLogger(__name__) _op_name_gen = NameGenerator("Op") diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index 0b4226da..ad78c6d3 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -22,7 +22,7 @@ from .channel import ChannelDataEcode from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2_grpc -_LOGGER = logging.getLogger("pipeline.pipeline_client") +_LOGGER = logging.getLogger(__name__) class PipelineClient(object): diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index d17afde7..70720e99 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -26,7 +26,7 @@ from .proto import pipeline_service_pb2_grpc from .operator import ResponseOp from .dag import DAGExecutor -_LOGGER = logging.getLogger("pipeline.pipeline_server") +_LOGGER = logging.getLogger(__name__) class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 6d7f2b63..3cca6527 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -27,7 +27,8 @@ import time import threading import multiprocessing -_TRACER = logging.getLogger("pipeline.profiler") +_LOGGER = logging.getLogger(__name__) +_LOGGER.propagate = False class PerformanceTracer(object): @@ -73,7 +74,7 @@ class PerformanceTracer(object): op_cost = {} err_count = 0 - _TRACER.info("==================== TRACER ======================") + _LOGGER.info("==================== TRACER ======================") # op while True: try: @@ -98,15 +99,15 @@ class PerformanceTracer(object): tot_cost += op_cost[name][action] if name != "DAG": - _TRACER.info("Op({}):".format(name)) + _LOGGER.info("Op({}):".format(name)) for action in actions: if action in op_cost[name]: - _TRACER.info("\t{}[{} ms]".format( + _LOGGER.info("\t{}[{} ms]".format( action, op_cost[name][action])) for action in calcu_actions: if action in op_cost[name]: calcu_cost += op_cost[name][action] - _TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / + _LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / tot_cost)) if "DAG" in op_cost: @@ -116,21 +117,21 @@ class PerformanceTracer(object): qps = 1.0 * tot / self._interval_s ave_cost = sum(calls) / tot latencys = [50, 60, 70, 80, 90, 95, 99] - _TRACER.info("DAGExecutor:") - _TRACER.info("\tquery count[{}]".format(tot)) - _TRACER.info("\tqps[{} q/s]".format(qps)) - _TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) - _TRACER.info("\tlatency:") - _TRACER.info("\t\tave[{} ms]".format(ave_cost)) + _LOGGER.info("DAGExecutor:") + _LOGGER.info("\tquery count[{}]".format(tot)) + _LOGGER.info("\tqps[{} q/s]".format(qps)) + _LOGGER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) + _LOGGER.info("\tlatency:") + _LOGGER.info("\t\tave[{} ms]".format(ave_cost)) for latency in latencys: - _TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int( + _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int( tot * latency / 100.0)])) # channel - _TRACER.info("Channel (server worker num[{}]):".format( + _LOGGER.info("Channel (server worker num[{}]):".format( self._server_worker_num)) for channel in channels: - _TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format( + _LOGGER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format( channel.name, channel.get_producers(), channel.get_consumers(), -- GitLab