提交 467d0eb5 编写于 作者: B barrierye

reset id when id reaches the maximum value to avoid overflow

上级 31dfca34
...@@ -172,10 +172,12 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -172,10 +172,12 @@ class ProcessChannel(multiprocessing.queues.Queue):
self._cv = multiprocessing.Condition() self._cv = multiprocessing.Condition()
self._manager = manager
self._producers = [] self._producers = []
self._pushed_producer_count = manager.dict() # {data_id: count} self._pushed_producer_count = manager.dict() # {data_id: count}
self._input_buf = manager.dict() # {data_id: {op_name: data}} self._input_buf = manager.dict() # {data_id: {op_name: data}}
self._reset_max_cursor = 1000000000000000000
self._consumer_cursors = manager.dict() # {op_name: cursor} self._consumer_cursors = manager.dict() # {op_name: cursor}
self._cursor_count = manager.dict() # {cursor: count} self._cursor_count = manager.dict() # {cursor: count}
self._base_cursor = manager.Value('i', 0) self._base_cursor = manager.Value('i', 0)
...@@ -377,12 +379,25 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -377,12 +379,25 @@ class ProcessChannel(multiprocessing.queues.Queue):
self._cursor_count.pop(consumer_cursor) self._cursor_count.pop(consumer_cursor)
self._output_buf.pop(0) self._output_buf.pop(0)
self._base_cursor.value += 1 self._base_cursor.value += 1
# to avoid cursor overflow
if self._base_cursor.value >= self._reset_max_cursor:
self._base_cursor.value -= self._reset_max_cursor
for name in self._consumer_cursors.keys():
self._consumer_cursors[name] -= self._reset_max_cursor
cursor_count_tmp = {
cursor - self._reset_max_cursor: count
for cursor, count in self._cursor_count.copy().items()
}
self._cursor_count.clear()
for cursor, count in cursor_count_tmp.items():
self._cursor_count[cursor] = count
self._consumer_cursors[op_name] += 1 self._consumer_cursors[op_name] += 1
new_consumer_cursor = self._consumer_cursors[op_name] new_consumer_cursor = self._consumer_cursors[op_name]
if self._cursor_count.get(new_consumer_cursor) is None: if self._cursor_count.get(new_consumer_cursor) is None:
self._cursor_count[new_consumer_cursor] = 0 self._cursor_count[new_consumer_cursor] = 0
self._cursor_count[new_consumer_cursor] += 1 self._cursor_count[new_consumer_cursor] += 1
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
"({}) A self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}". "({}) A self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}".
...@@ -443,6 +458,7 @@ class ThreadChannel(Queue.Queue): ...@@ -443,6 +458,7 @@ class ThreadChannel(Queue.Queue):
self._pushed_producer_count = {} # {data_id: count} self._pushed_producer_count = {} # {data_id: count}
self._input_buf = {} # {data_id: {op_name: data}} self._input_buf = {} # {data_id: {op_name: data}}
self._reset_max_cursor = 1000000000000000000
self._consumer_cursors = {} # {op_name: idx} self._consumer_cursors = {} # {op_name: idx}
self._cursor_count = {} # {cursor: count} self._cursor_count = {} # {cursor: count}
self._base_cursor = 0 self._base_cursor = 0
...@@ -601,6 +617,15 @@ class ThreadChannel(Queue.Queue): ...@@ -601,6 +617,15 @@ class ThreadChannel(Queue.Queue):
self._cursor_count.pop(consumer_cursor) self._cursor_count.pop(consumer_cursor)
self._output_buf.pop(0) self._output_buf.pop(0)
self._base_cursor += 1 self._base_cursor += 1
# to avoid cursor overflow
if self._base_cursor >= self._reset_max_cursor:
self._base_cursor -= self._reset_max_cursor
for name in self._consumer_cursors:
self._consumer_cursors[name] -= self._reset_max_cursor
self._cursor_count = {
cursor - self._reset_max_cursor: count
for cursor, count in self._cursor_count.items()
}
self._consumer_cursors[op_name] += 1 self._consumer_cursors[op_name] += 1
new_consumer_cursor = self._consumer_cursors[op_name] new_consumer_cursor = self._consumer_cursors[op_name]
......
...@@ -65,6 +65,7 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -65,6 +65,7 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer):
self._cv = threading.Condition() self._cv = threading.Condition()
self._globel_resp_dict = {} self._globel_resp_dict = {}
self._id_counter = 0 self._id_counter = 0
self._reset_max_id = 1000000000000000000
self._retry = retry self._retry = retry
self._is_run = True self._is_run = True
self._pack_func = pack_func self._pack_func = pack_func
...@@ -112,6 +113,8 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -112,6 +113,8 @@ class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer):
def _get_next_id(self): def _get_next_id(self):
with self._id_lock: with self._id_lock:
if self._id_counter >= self._reset_max_id:
self._id_counter -= self._reset_max_id
self._id_counter += 1 self._id_counter += 1
return self._id_counter - 1 return self._id_counter - 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册