提交 975e5d52 编写于 作者: B barriery

use PriorityQueue instead of Queue in Channel; add logid in ServingClient

上级 a3f9ab42
...@@ -181,6 +181,14 @@ class ChannelData(object): ...@@ -181,6 +181,14 @@ class ChannelData(object):
os._exit(-1) os._exit(-1)
return feed return feed
def __cmp__(self, other):
if self.id < other.id:
return -1
elif self.id == other.id:
return 0
else:
return 1
def __str__(self): def __str__(self):
return "type[{}], ecode[{}], id[{}]".format( return "type[{}], ecode[{}], id[{}]".format(
ChannelDataType(self.datatype).name, self.ecode, self.id) ChannelDataType(self.datatype).name, self.ecode, self.id)
...@@ -222,7 +230,7 @@ class ProcessChannel(object): ...@@ -222,7 +230,7 @@ class ProcessChannel(object):
# see more: # see more:
# - https://bugs.python.org/issue18277 # - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21 # - https://hg.python.org/cpython/rev/860fc6a2bd21
self._que = manager.Queue(maxsize=maxsize) self._que = manager.PriorityQueue(maxsize=maxsize)
self._maxsize = maxsize self._maxsize = maxsize
self.name = name self.name = name
self._stop = manager.Value('i', 0) self._stop = manager.Value('i', 0)
...@@ -489,7 +497,7 @@ class ProcessChannel(object): ...@@ -489,7 +497,7 @@ class ProcessChannel(object):
self._cv.notify_all() self._cv.notify_all()
class ThreadChannel(Queue.Queue): class ThreadChannel(Queue.PriorityQueue):
""" """
(Thread version)The channel used for communication between Ops. (Thread version)The channel used for communication between Ops.
......
...@@ -29,7 +29,7 @@ from .operator import Op, RequestOp, ResponseOp, VirtualOp ...@@ -29,7 +29,7 @@ from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData, from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler, PerformanceTracer from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator, ThreadIdGenerator from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
...@@ -324,7 +324,7 @@ class DAG(object): ...@@ -324,7 +324,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer self._tracer = tracer
if not self._is_thread_op: if not self._is_thread_op:
self._manager = multiprocessing.Manager() self._manager = PipelineProcSyncManager()
_LOGGER.info("[DAG] Succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): def get_use_ops(self, response_op):
......
...@@ -40,18 +40,11 @@ logger_config = { ...@@ -40,18 +40,11 @@ logger_config = {
"format": "%(asctime)s %(message)s", "format": "%(asctime)s %(message)s",
}, },
}, },
"filters": {
"info_only_filter": {
"()": SectionLevelFilter,
"levels": [logging.INFO],
},
},
"handlers": { "handlers": {
"f_pipeline.log": { "f_pipeline.log": {
"class": "logging.FileHandler", "class": "logging.FileHandler",
"level": "INFO", "level": "INFO",
"formatter": "normal_fmt", "formatter": "normal_fmt",
"filters": ["info_only_filter"],
"filename": os.path.join(log_dir, "pipeline.log"), "filename": os.path.join(log_dir, "pipeline.log"),
}, },
"f_pipeline.log.wf": { "f_pipeline.log.wf": {
......
...@@ -22,6 +22,7 @@ import logging ...@@ -22,6 +22,7 @@ import logging
import func_timeout import func_timeout
import os import os
import sys import sys
import collections
import numpy as np import numpy as np
from numpy import * from numpy import *
...@@ -127,7 +128,7 @@ class Op(object): ...@@ -127,7 +128,7 @@ class Op(object):
fetch_names): fetch_names):
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) has no client (and it also do not " _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function".format(self.name)) "run the process function)".format(self.name))
return None return None
if client_type == 'brpc': if client_type == 'brpc':
client = Client() client = Client()
...@@ -294,8 +295,8 @@ class Op(object): ...@@ -294,8 +295,8 @@ class Op(object):
def _run_preprocess(self, parsed_data_dict, op_info_prefix): def _run_preprocess(self, parsed_data_dict, op_info_prefix):
_LOGGER.debug("{} Running preprocess".format(op_info_prefix)) _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {} preped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
for data_id, parsed_data in parsed_data_dict.items(): for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None preped_data, error_channeldata = None, None
try: try:
...@@ -326,18 +327,18 @@ class Op(object): ...@@ -326,18 +327,18 @@ class Op(object):
def _run_process(self, preped_data_dict, op_info_prefix): def _run_process(self, preped_data_dict, op_info_prefix):
_LOGGER.debug("{} Running process".format(op_info_prefix)) _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {} midped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
if self.with_serving: if self.with_serving:
data_ids = preped_data_dict.keys() data_ids = preped_data_dict.keys()
typical_logid = data_ids[0] typical_logid = data_ids[0]
if len(data_ids) != 1: if len(data_ids) != 1:
for data_id in data_ids: for data_id in data_ids:
_LOGGER.info( _LOGGER.info(
"(logid={}) During access to PaddleServingService," "(logid={}) {} During access to PaddleServingService,"
" we selected logid={} (batch: {}) as a representative" " we selected logid={} (from batch: {}) as a "
" for logging.".format(data_id, typical_logid, "representative for logging.".format(
data_ids)) data_id, op_info_prefix, typical_logid, data_ids))
feed_batch = [preped_data_dict[data_id] for data_id in data_ids] feed_batch = [preped_data_dict[data_id] for data_id in data_ids]
midped_batch = None midped_batch = None
ecode = ChannelDataEcode.OK.value ecode = ChannelDataEcode.OK.value
...@@ -407,8 +408,8 @@ class Op(object): ...@@ -407,8 +408,8 @@ class Op(object):
def _run_postprocess(self, parsed_data_dict, midped_data_dict, def _run_postprocess(self, parsed_data_dict, midped_data_dict,
op_info_prefix): op_info_prefix):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix)) _LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {} postped_data_dict = collections.OrderedDict()
err_channeldata_dict = {} err_channeldata_dict = collections.OrderedDict()
for data_id, midped_data in midped_data_dict.items(): for data_id, midped_data in midped_data_dict.items():
postped_data, err_channeldata = None, None postped_data, err_channeldata = None, None
try: try:
...@@ -487,7 +488,7 @@ class Op(object): ...@@ -487,7 +488,7 @@ class Op(object):
yield batch yield batch
def _parse_channeldata_batch(self, batch, output_channels): def _parse_channeldata_batch(self, batch, output_channels):
parsed_data_dict = {} parsed_data_dict = collections.OrderedDict()
need_profile_dict = {} need_profile_dict = {}
profile_dict = {} profile_dict = {}
for channeldata_dict in batch: for channeldata_dict in batch:
......
...@@ -16,6 +16,14 @@ import sys ...@@ -16,6 +16,14 @@ import sys
import logging import logging
import threading import threading
import multiprocessing import multiprocessing
if sys.version_info.major == 2:
import Queue
from Queue import PriorityQueue
elif sys.version_info.major == 3:
import queue as Queue
from queue import PriorityQueue
else:
raise Exception("Error Python version")
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
...@@ -87,3 +95,18 @@ class ProcessIdGenerator(UnsafeIdGenerator): ...@@ -87,3 +95,18 @@ class ProcessIdGenerator(UnsafeIdGenerator):
next_id = self._counter.value next_id = self._counter.value
self._counter.value += self._step self._counter.value += self._step
return next_id return next_id
def PipelineProcSyncManager():
"""
add PriorityQueue into SyncManager, see more:
https://stackoverflow.com/questions/25324560/strange-queue-priorityqueue-behaviour-with-multiprocessing-in-python-2-7-6?answertab=active#tab-top
"""
class PipelineManager(multiprocessing.managers.SyncManager):
pass
PipelineManager.register("PriorityQueue", PriorityQueue)
m = PipelineManager()
m.start()
return m
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册