diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 615967083f92ff766dabbe3f82b92d34138858e1..1800dc923ab45afb05f96b21cd61e5b83f0084ce 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -156,7 +156,7 @@ class ChannelData(object): ChannelDataType(self.datatype).name, self.ecode, self.id) -class ProcessChannel(multiprocessing.queues.Queue): +class ProcessChannel(object): """ (Process version) The channel used for communication between Ops. @@ -186,14 +186,13 @@ class ProcessChannel(multiprocessing.queues.Queue): """ def __init__(self, manager, name=None, maxsize=0, timeout=None): - # https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/ - if sys.version_info.major == 2: - super(ProcessChannel, self).__init__(maxsize=maxsize) - elif sys.version_info.major == 3: - super(ProcessChannel, self).__init__( - maxsize=maxsize, ctx=multiprocessing.get_context()) - else: - raise Exception("Error Python version") + # For queue multiprocess: after putting an object on + # an empty queue there may be an infinitessimal delay + # before the queue's :meth:`~Queue.empty` + # see more: + # - https://bugs.python.org/issue18277 + # - https://hg.python.org/cpython/rev/860fc6a2bd21 + self._que = manager.Queue(maxsize=maxsize) self._maxsize = maxsize self._timeout = timeout self.name = name @@ -255,13 +254,13 @@ class ProcessChannel(multiprocessing.queues.Queue): with self._cv: while self._stop is False: try: - self.put({op_name: channeldata}, timeout=0) + self._que.put({op_name: channeldata}, timeout=0) break except Queue.Full: self._cv.wait() _LOGGER.debug( self._log("{} channel size: {}".format(op_name, - self.qsize()))) + self._que.qsize()))) self._cv.notify_all() _LOGGER.debug(self._log("{} notify all".format(op_name))) _LOGGER.debug(self._log("{} push data succ!".format(op_name))) @@ -305,7 +304,7 @@ class ProcessChannel(multiprocessing.queues.Queue): _LOGGER.debug( self._log("{} push data succ: {}".format( op_name, put_data.__str__()))) - self.put(put_data, timeout=0) + self._que.put(put_data, timeout=0) break except Queue.Empty: self._cv.wait() @@ -329,20 +328,20 @@ class ProcessChannel(multiprocessing.queues.Queue): try: _LOGGER.debug( self._log("{} try to get(with channel empty: {})". - format(op_name, self.empty()))) + format(op_name, self._que.empty()))) # For queue multiprocess: after putting an object on # an empty queue there may be an infinitessimal delay # before the queue's :meth:`~Queue.empty` # see more: # - https://bugs.python.org/issue18277 # - https://hg.python.org/cpython/rev/860fc6a2bd21 - resp = self.get(timeout=1e-3) + resp = self._que.get(timeout=1e-3) break except Queue.Empty: _LOGGER.debug( self._log( "{} wait for empty queue(with channel empty: {})". - format(op_name, self.empty()))) + format(op_name, self._que.empty()))) self._cv.wait() _LOGGER.debug( self._log("{} get data succ: {}".format(op_name, resp.__str__( @@ -376,21 +375,15 @@ class ProcessChannel(multiprocessing.queues.Queue): try: _LOGGER.debug( self._log("{} try to get(with channel size: {})".format( - op_name, self.qsize()))) - # For queue multiprocess: after putting an object on - # an empty queue there may be an infinitessimal delay - # before the queue's :meth:`~Queue.empty` - # see more: - # - https://bugs.python.org/issue18277 - # - https://hg.python.org/cpython/rev/860fc6a2bd21 - channeldata = self.get(timeout=1e-3) + op_name, self._que.qsize()))) + channeldata = self._que.get(timeout=1e-3) self._output_buf.append(channeldata) break except Queue.Empty: _LOGGER.debug( self._log( "{} wait for empty queue(with channel size: {})". - format(op_name, self.qsize()))) + format(op_name, self._que.qsize()))) self._cv.wait() consumer_cursor = self._consumer_cursors[op_name]