diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 953f378ae306bc8174c30224b85763b44ee2c811..615967083f92ff766dabbe3f82b92d34138858e1 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -634,8 +634,8 @@ class ThreadChannel(Queue.Queue): consumer_cursor = self._consumer_cursors[op_name] base_cursor = self._base_cursor data_idx = consumer_cursor - base_cursor - resp = self._output_buf[data_idx] - _LOGGER.debug(self._log("{} get data: {}".format(op_name, resp))) + + resp = None self._cursor_count[consumer_cursor] -= 1 if consumer_cursor == base_cursor and self._cursor_count[ @@ -643,7 +643,7 @@ class ThreadChannel(Queue.Queue): # When all the different Ops get the data that data_idx points # to, pop the data from output_buf. self._cursor_count.pop(consumer_cursor) - self._output_buf.pop(0) + resp = self._output_buf.pop(0) self._base_cursor += 1 # to avoid cursor overflow if self._base_cursor >= self._reset_max_cursor: @@ -654,6 +654,9 @@ class ThreadChannel(Queue.Queue): cursor - self._reset_max_cursor: count for cursor, count in self._cursor_count.items() } + else: + resp = copy.deepcopy(self._output_buf[data_idx]) + _LOGGER.debug(self._log("{} get data: {}".format(op_name, resp))) self._consumer_cursors[op_name] += 1 new_consumer_cursor = self._consumer_cursors[op_name] @@ -664,8 +667,7 @@ class ThreadChannel(Queue.Queue): self._cv.notify_all() _LOGGER.debug(self._log("multi | {} get data succ!".format(op_name))) - # return resp # reference, read only - return copy.deepcopy(resp) + return resp def stop(self): #TODO