提交 ed639511 编写于 作者: B barriery

update code structure of logger module

上级 c5776b78
......@@ -29,7 +29,7 @@ import enum
import os
import copy
_LOGGER = logging.getLogger("pipeline.channel")
_LOGGER = logging.getLogger(__name__)
class ChannelDataEcode(enum.Enum):
......
......@@ -32,7 +32,7 @@ from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger("pipeline.dag")
_LOGGER = logging.getLogger(__name__)
class DAGExecutor(object):
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import logging
import logging.handlers
import logging.config
import os
......@@ -25,66 +25,59 @@ class SectionLevelFilter(object):
return logRecord.levelno in self._levels
class OutOfMouduleFilter(object):
def __init__(self, out_names):
self._out_names = out_names
def filter(self, logRecord):
return logRecord.name not in self._out_names
class OutOfMouduleAndSectionLevelFilter(object):
def __init__(self, out_names, levels):
self._out_names = out_names
self._levels = levels
def filter(self, logRecord):
if logRecord.name in self._out_names:
return False
return logRecord.levelno in self._levels
class StreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(StreamHandler, self).__init__(*args, **kwargs)
self.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
log_dir = "PipelineServingLogs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# root logger
_LOGGER = logging.getLogger()
_LOGGER.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")
# info and warn
file_info = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "INFO.log"))
file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING]))
file_info.setFormatter(formatter)
# err and critical
file_err = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "ERROR.log"))
file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_err.setLevel(logging.ERROR)
file_err.setFormatter(formatter)
_LOGGER.addHandler(file_info)
_LOGGER.addHandler(file_err)
# tracer logger
_TRACER = logging.getLogger("pipeline.profiler")
_TRACER.setLevel(logging.INFO)
_TRACER.addFilter(logging.Filter("pipeline.profiler"))
# tracer
tracer_formatter = logging.Formatter("%(asctime)s %(message)s")
file_trace = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "TRACE.log"))
file_trace.setFormatter(tracer_formatter)
_TRACER.addHandler(file_trace)
logger_config = {
"version": 1,
"formatters": {
"normal_fmt": {
"format":
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s",
},
"tracer_fmt": {
"format": "%(asctime)s %(message)s",
},
},
"filters": {
"info_only_filter": {
"()": SectionLevelFilter,
"levels": [logging.INFO],
},
},
"handlers": {
"f_pipeline.log": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filters": ["info_only_filter"],
"filename": os.path.join(log_dir, "pipeline.log"),
},
"f_pipeline.log.wf": {
"class": "logging.FileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
},
"f_tracer.log": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
},
},
"loggers": {
# propagate = True
".".join(__name__.split(".")[:-1] + ["profiler"]): {
"level": "INFO",
"handlers": ["f_tracer.log"],
},
},
"root": {
"level": "DEBUG",
"handlers": ["f_pipeline.log", "f_pipeline.log.wf"],
},
}
logging.config.dictConfig(logger_config)
......@@ -32,7 +32,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler
_LOGGER = logging.getLogger("pipeline.operator")
_LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op")
......
......@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode
from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc
_LOGGER = logging.getLogger("pipeline.pipeline_client")
_LOGGER = logging.getLogger(__name__)
class PipelineClient(object):
......
......@@ -26,7 +26,7 @@ from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp
from .dag import DAGExecutor
_LOGGER = logging.getLogger("pipeline.pipeline_server")
_LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
......
......@@ -27,7 +27,8 @@ import time
import threading
import multiprocessing
_TRACER = logging.getLogger("pipeline.profiler")
_LOGGER = logging.getLogger(__name__)
_LOGGER.propagate = False
class PerformanceTracer(object):
......@@ -73,7 +74,7 @@ class PerformanceTracer(object):
op_cost = {}
err_count = 0
_TRACER.info("==================== TRACER ======================")
_LOGGER.info("==================== TRACER ======================")
# op
while True:
try:
......@@ -98,15 +99,15 @@ class PerformanceTracer(object):
tot_cost += op_cost[name][action]
if name != "DAG":
_TRACER.info("Op({}):".format(name))
_LOGGER.info("Op({}):".format(name))
for action in actions:
if action in op_cost[name]:
_TRACER.info("\t{}[{} ms]".format(
_LOGGER.info("\t{}[{} ms]".format(
action, op_cost[name][action]))
for action in calcu_actions:
if action in op_cost[name]:
calcu_cost += op_cost[name][action]
_TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
_LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if "DAG" in op_cost:
......@@ -116,21 +117,21 @@ class PerformanceTracer(object):
qps = 1.0 * tot / self._interval_s
ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99]
_TRACER.info("DAGExecutor:")
_TRACER.info("\tquery count[{}]".format(tot))
_TRACER.info("\tqps[{} q/s]".format(qps))
_TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot))
_TRACER.info("\tlatency:")
_TRACER.info("\t\tave[{} ms]".format(ave_cost))
_LOGGER.info("DAGExecutor:")
_LOGGER.info("\tquery count[{}]".format(tot))
_LOGGER.info("\tqps[{} q/s]".format(qps))
_LOGGER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot))
_LOGGER.info("\tlatency:")
_LOGGER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys:
_TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int(
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
# channel
_TRACER.info("Channel (server worker num[{}]):".format(
_LOGGER.info("Channel (server worker num[{}]):".format(
self._server_worker_num))
for channel in channels:
_TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
_LOGGER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
channel.name,
channel.get_producers(),
channel.get_consumers(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册