From 467d0eb59780e2576fbc9cbc0164ba0cb4eb7816 Mon Sep 17 00:00:00 2001 From: barrierye Date: Sun, 28 Jun 2020 21:02:22 +0800 Subject: [PATCH] reset id when id reaches the maximum value to avoid overflow --- python/pipeline/channel.py | 25 +++++++++++++++++++++++++ python/pipeline/pipeline_server.py | 3 +++ 2 files changed, 28 insertions(+) diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index f74fce61..2763a814 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -172,10 +172,12 @@ class ProcessChannel(multiprocessing.queues.Queue): self._cv = multiprocessing.Condition() + self._manager = manager self._producers = [] self._pushed_producer_count = manager.dict() # {data_id: count} self._input_buf = manager.dict() # {data_id: {op_name: data}} + self._reset_max_cursor = 1000000000000000000 self._consumer_cursors = manager.dict() # {op_name: cursor} self._cursor_count = manager.dict() # {cursor: count} self._base_cursor = manager.Value('i', 0) @@ -377,12 +379,25 @@ class ProcessChannel(multiprocessing.queues.Queue): self._cursor_count.pop(consumer_cursor) self._output_buf.pop(0) self._base_cursor.value += 1 + # to avoid cursor overflow + if self._base_cursor.value >= self._reset_max_cursor: + self._base_cursor.value -= self._reset_max_cursor + for name in self._consumer_cursors.keys(): + self._consumer_cursors[name] -= self._reset_max_cursor + cursor_count_tmp = { + cursor - self._reset_max_cursor: count + for cursor, count in self._cursor_count.copy().items() + } + self._cursor_count.clear() + for cursor, count in cursor_count_tmp.items(): + self._cursor_count[cursor] = count self._consumer_cursors[op_name] += 1 new_consumer_cursor = self._consumer_cursors[op_name] if self._cursor_count.get(new_consumer_cursor) is None: self._cursor_count[new_consumer_cursor] = 0 self._cursor_count[new_consumer_cursor] += 1 + _LOGGER.debug( self._log( "({}) A self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}". @@ -443,6 +458,7 @@ class ThreadChannel(Queue.Queue): self._pushed_producer_count = {} # {data_id: count} self._input_buf = {} # {data_id: {op_name: data}} + self._reset_max_cursor = 1000000000000000000 self._consumer_cursors = {} # {op_name: idx} self._cursor_count = {} # {cursor: count} self._base_cursor = 0 @@ -601,6 +617,15 @@ class ThreadChannel(Queue.Queue): self._cursor_count.pop(consumer_cursor) self._output_buf.pop(0) self._base_cursor += 1 + # to avoid cursor overflow + if self._base_cursor >= self._reset_max_cursor: + self._base_cursor -= self._reset_max_cursor + for name in self._consumer_cursors: + self._consumer_cursors[name] -= self._reset_max_cursor + self._cursor_count = { + cursor - self._reset_max_cursor: count + for cursor, count in self._cursor_count.items() + } self._consumer_cursors[op_name] += 1 new_consumer_cursor = self._consumer_cursors[op_name] diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 92ad1312..55289eec 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -65,6 +65,7 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): self._cv = threading.Condition() self._globel_resp_dict = {} self._id_counter = 0 + self._reset_max_id = 1000000000000000000 self._retry = retry self._is_run = True self._pack_func = pack_func @@ -112,6 +113,8 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): def _get_next_id(self): with self._id_lock: + if self._id_counter >= self._reset_max_id: + self._id_counter -= self._reset_max_id self._id_counter += 1 return self._id_counter - 1 -- GitLab