diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index f74fce61346d3ef38701c5145f860d74fca8cd95..2763a814c6ad104a3d8dd371079424ffa158a538 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -172,10 +172,12 @@ class ProcessChannel(multiprocessing.queues.Queue): self._cv = multiprocessing.Condition() + self._manager = manager self._producers = [] self._pushed_producer_count = manager.dict() # {data_id: count} self._input_buf = manager.dict() # {data_id: {op_name: data}} + self._reset_max_cursor = 1000000000000000000 self._consumer_cursors = manager.dict() # {op_name: cursor} self._cursor_count = manager.dict() # {cursor: count} self._base_cursor = manager.Value('i', 0) @@ -377,12 +379,25 @@ class ProcessChannel(multiprocessing.queues.Queue): self._cursor_count.pop(consumer_cursor) self._output_buf.pop(0) self._base_cursor.value += 1 + # to avoid cursor overflow + if self._base_cursor.value >= self._reset_max_cursor: + self._base_cursor.value -= self._reset_max_cursor + for name in self._consumer_cursors.keys(): + self._consumer_cursors[name] -= self._reset_max_cursor + cursor_count_tmp = { + cursor - self._reset_max_cursor: count + for cursor, count in self._cursor_count.copy().items() + } + self._cursor_count.clear() + for cursor, count in cursor_count_tmp.items(): + self._cursor_count[cursor] = count self._consumer_cursors[op_name] += 1 new_consumer_cursor = self._consumer_cursors[op_name] if self._cursor_count.get(new_consumer_cursor) is None: self._cursor_count[new_consumer_cursor] = 0 self._cursor_count[new_consumer_cursor] += 1 + _LOGGER.debug( self._log( "({}) A self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}". @@ -443,6 +458,7 @@ class ThreadChannel(Queue.Queue): self._pushed_producer_count = {} # {data_id: count} self._input_buf = {} # {data_id: {op_name: data}} + self._reset_max_cursor = 1000000000000000000 self._consumer_cursors = {} # {op_name: idx} self._cursor_count = {} # {cursor: count} self._base_cursor = 0 @@ -601,6 +617,15 @@ class ThreadChannel(Queue.Queue): self._cursor_count.pop(consumer_cursor) self._output_buf.pop(0) self._base_cursor += 1 + # to avoid cursor overflow + if self._base_cursor >= self._reset_max_cursor: + self._base_cursor -= self._reset_max_cursor + for name in self._consumer_cursors: + self._consumer_cursors[name] -= self._reset_max_cursor + self._cursor_count = { + cursor - self._reset_max_cursor: count + for cursor, count in self._cursor_count.items() + } self._consumer_cursors[op_name] += 1 new_consumer_cursor = self._consumer_cursors[op_name] diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 92ad131277a73f8dc7e99d0d2e7e98a14186cbf8..55289eeca42e02bb979d4a21791fdde44e0aff02 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -65,6 +65,7 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): self._cv = threading.Condition() self._globel_resp_dict = {} self._id_counter = 0 + self._reset_max_id = 1000000000000000000 self._retry = retry self._is_run = True self._pack_func = pack_func @@ -112,6 +113,8 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): def _get_next_id(self): with self._id_lock: + if self._id_counter >= self._reset_max_id: + self._id_counter -= self._reset_max_id self._id_counter += 1 return self._id_counter - 1