提交 1470beb1 编写于 作者: B barrierye

fix queue in multi-process

上级 451f403f
......@@ -156,7 +156,7 @@ class ChannelData(object):
ChannelDataType(self.datatype).name, self.ecode, self.id)
class ProcessChannel(multiprocessing.queues.Queue):
class ProcessChannel(object):
"""
(Process version) The channel used for communication between Ops.
......@@ -186,14 +186,13 @@ class ProcessChannel(multiprocessing.queues.Queue):
"""
def __init__(self, manager, name=None, maxsize=0, timeout=None):
# https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/
if sys.version_info.major == 2:
super(ProcessChannel, self).__init__(maxsize=maxsize)
elif sys.version_info.major == 3:
super(ProcessChannel, self).__init__(
maxsize=maxsize, ctx=multiprocessing.get_context())
else:
raise Exception("Error Python version")
# For queue multiprocess: after putting an object on
# an empty queue there may be an infinitessimal delay
# before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
self._que = manager.Queue(maxsize=maxsize)
self._maxsize = maxsize
self._timeout = timeout
self.name = name
......@@ -255,13 +254,13 @@ class ProcessChannel(multiprocessing.queues.Queue):
with self._cv:
while self._stop is False:
try:
self.put({op_name: channeldata}, timeout=0)
self._que.put({op_name: channeldata}, timeout=0)
break
except Queue.Full:
self._cv.wait()
_LOGGER.debug(
self._log("{} channel size: {}".format(op_name,
self.qsize())))
self._que.qsize())))
self._cv.notify_all()
_LOGGER.debug(self._log("{} notify all".format(op_name)))
_LOGGER.debug(self._log("{} push data succ!".format(op_name)))
......@@ -305,7 +304,7 @@ class ProcessChannel(multiprocessing.queues.Queue):
_LOGGER.debug(
self._log("{} push data succ: {}".format(
op_name, put_data.__str__())))
self.put(put_data, timeout=0)
self._que.put(put_data, timeout=0)
break
except Queue.Empty:
self._cv.wait()
......@@ -329,20 +328,20 @@ class ProcessChannel(multiprocessing.queues.Queue):
try:
_LOGGER.debug(
self._log("{} try to get(with channel empty: {})".
format(op_name, self.empty())))
format(op_name, self._que.empty())))
# For queue multiprocess: after putting an object on
# an empty queue there may be an infinitessimal delay
# before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
resp = self.get(timeout=1e-3)
resp = self._que.get(timeout=1e-3)
break
except Queue.Empty:
_LOGGER.debug(
self._log(
"{} wait for empty queue(with channel empty: {})".
format(op_name, self.empty())))
format(op_name, self._que.empty())))
self._cv.wait()
_LOGGER.debug(
self._log("{} get data succ: {}".format(op_name, resp.__str__(
......@@ -376,21 +375,15 @@ class ProcessChannel(multiprocessing.queues.Queue):
try:
_LOGGER.debug(
self._log("{} try to get(with channel size: {})".format(
op_name, self.qsize())))
# For queue multiprocess: after putting an object on
# an empty queue there may be an infinitessimal delay
# before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
channeldata = self.get(timeout=1e-3)
op_name, self._que.qsize())))
channeldata = self._que.get(timeout=1e-3)
self._output_buf.append(channeldata)
break
except Queue.Empty:
_LOGGER.debug(
self._log(
"{} wait for empty queue(with channel size: {})".
format(op_name, self.qsize())))
format(op_name, self._que.qsize())))
self._cv.wait()
consumer_cursor = self._consumer_cursors[op_name]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册