提交 1e548efa 编写于 作者: B barrierye

fix queue in multi-process

上级 63cf35d3
...@@ -156,7 +156,7 @@ class ChannelData(object): ...@@ -156,7 +156,7 @@ class ChannelData(object):
ChannelDataType(self.datatype).name, self.ecode, self.id) ChannelDataType(self.datatype).name, self.ecode, self.id)
class ProcessChannel(multiprocessing.queues.Queue): class ProcessChannel(object):
""" """
(Process version) The channel used for communication between Ops. (Process version) The channel used for communication between Ops.
...@@ -186,14 +186,13 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -186,14 +186,13 @@ class ProcessChannel(multiprocessing.queues.Queue):
""" """
def __init__(self, manager, name=None, maxsize=0, timeout=None): def __init__(self, manager, name=None, maxsize=0, timeout=None):
# https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/ # For queue multiprocess: after putting an object on
if sys.version_info.major == 2: # an empty queue there may be an infinitessimal delay
super(ProcessChannel, self).__init__(maxsize=maxsize) # before the queue's :meth:`~Queue.empty`
elif sys.version_info.major == 3: # see more:
super(ProcessChannel, self).__init__( # - https://bugs.python.org/issue18277
maxsize=maxsize, ctx=multiprocessing.get_context()) # - https://hg.python.org/cpython/rev/860fc6a2bd21
else: self._que = manager.Queue(maxsize=maxsize)
raise Exception("Error Python version")
self._maxsize = maxsize self._maxsize = maxsize
self._timeout = timeout self._timeout = timeout
self.name = name self.name = name
...@@ -255,13 +254,13 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -255,13 +254,13 @@ class ProcessChannel(multiprocessing.queues.Queue):
with self._cv: with self._cv:
while self._stop is False: while self._stop is False:
try: try:
self.put({op_name: channeldata}, timeout=0) self._que.put({op_name: channeldata}, timeout=0)
break break
except Queue.Full: except Queue.Full:
self._cv.wait() self._cv.wait()
_LOGGER.debug( _LOGGER.debug(
self._log("{} channel size: {}".format(op_name, self._log("{} channel size: {}".format(op_name,
self.qsize()))) self._que.qsize())))
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug(self._log("{} notify all".format(op_name))) _LOGGER.debug(self._log("{} notify all".format(op_name)))
_LOGGER.debug(self._log("{} push data succ!".format(op_name))) _LOGGER.debug(self._log("{} push data succ!".format(op_name)))
...@@ -305,7 +304,7 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -305,7 +304,7 @@ class ProcessChannel(multiprocessing.queues.Queue):
_LOGGER.debug( _LOGGER.debug(
self._log("{} push data succ: {}".format( self._log("{} push data succ: {}".format(
op_name, put_data.__str__()))) op_name, put_data.__str__())))
self.put(put_data, timeout=0) self._que.put(put_data, timeout=0)
break break
except Queue.Empty: except Queue.Empty:
self._cv.wait() self._cv.wait()
...@@ -329,20 +328,20 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -329,20 +328,20 @@ class ProcessChannel(multiprocessing.queues.Queue):
try: try:
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get(with channel empty: {})". self._log("{} try to get(with channel empty: {})".
format(op_name, self.empty()))) format(op_name, self._que.empty())))
# For queue multiprocess: after putting an object on # For queue multiprocess: after putting an object on
# an empty queue there may be an infinitessimal delay # an empty queue there may be an infinitessimal delay
# before the queue's :meth:`~Queue.empty` # before the queue's :meth:`~Queue.empty`
# see more: # see more:
# - https://bugs.python.org/issue18277 # - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21 # - https://hg.python.org/cpython/rev/860fc6a2bd21
resp = self.get(timeout=1e-3) resp = self._que.get(timeout=1e-3)
break break
except Queue.Empty: except Queue.Empty:
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"{} wait for empty queue(with channel empty: {})". "{} wait for empty queue(with channel empty: {})".
format(op_name, self.empty()))) format(op_name, self._que.empty())))
self._cv.wait() self._cv.wait()
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data succ: {}".format(op_name, resp.__str__( self._log("{} get data succ: {}".format(op_name, resp.__str__(
...@@ -376,21 +375,15 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -376,21 +375,15 @@ class ProcessChannel(multiprocessing.queues.Queue):
try: try:
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get(with channel size: {})".format( self._log("{} try to get(with channel size: {})".format(
op_name, self.qsize()))) op_name, self._que.qsize())))
# For queue multiprocess: after putting an object on channeldata = self._que.get(timeout=1e-3)
# an empty queue there may be an infinitessimal delay
# before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
channeldata = self.get(timeout=1e-3)
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
break break
except Queue.Empty: except Queue.Empty:
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"{} wait for empty queue(with channel size: {})". "{} wait for empty queue(with channel size: {})".
format(op_name, self.qsize()))) format(op_name, self._que.qsize())))
self._cv.wait() self._cv.wait()
consumer_cursor = self._consumer_cursors[op_name] consumer_cursor = self._consumer_cursors[op_name]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册