提交 31dfca34 编写于 作者: B barrierye

add comment for channel

上级 ac66194e
...@@ -173,7 +173,7 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -173,7 +173,7 @@ class ProcessChannel(multiprocessing.queues.Queue):
self._cv = multiprocessing.Condition() self._cv = multiprocessing.Condition()
self._producers = [] self._producers = []
self.pushed_producer_count = manager.dict() # {data_id: count} self._pushed_producer_count = manager.dict() # {data_id: count}
self._input_buf = manager.dict() # {data_id: {op_name: data}} self._input_buf = manager.dict() # {data_id: {op_name: data}}
self._consumer_cursors = manager.dict() # {op_name: cursor} self._consumer_cursors = manager.dict() # {op_name: cursor}
...@@ -251,19 +251,19 @@ class ProcessChannel(multiprocessing.queues.Queue): ...@@ -251,19 +251,19 @@ class ProcessChannel(multiprocessing.queues.Queue):
name: None name: None
for name in self._producers for name in self._producers
} }
self.pushed_producer_count[data_id] = 0 self._pushed_producer_count[data_id] = 0
# see: https://docs.python.org/3.6/library/multiprocessing.html?highlight=multiprocess#proxy-objects # see: https://docs.python.org/3.6/library/multiprocessing.html?highlight=multiprocess#proxy-objects
# self._input_buf[data_id][op_name] = channeldata # self._input_buf[data_id][op_name] = channeldata
tmp_input_buf = self._input_buf[data_id] tmp_input_buf = self._input_buf[data_id]
tmp_input_buf[op_name] = channeldata tmp_input_buf[op_name] = channeldata
self._input_buf[data_id] = tmp_input_buf self._input_buf[data_id] = tmp_input_buf
if self.pushed_producer_count[data_id] + 1 == producer_num: if self._pushed_producer_count[data_id] + 1 == producer_num:
put_data = self._input_buf[data_id] put_data = self._input_buf[data_id]
self._input_buf.pop(data_id) self._input_buf.pop(data_id)
self.pushed_producer_count.pop(data_id) self._pushed_producer_count.pop(data_id)
else: else:
self.pushed_producer_count[data_id] += 1 self._pushed_producer_count[data_id] += 1
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
...@@ -440,7 +440,7 @@ class ThreadChannel(Queue.Queue): ...@@ -440,7 +440,7 @@ class ThreadChannel(Queue.Queue):
self._cv = threading.Condition() self._cv = threading.Condition()
self._producers = [] self._producers = []
self.pushed_producer_count = {} # {data_id: count} self._pushed_producer_count = {} # {data_id: count}
self._input_buf = {} # {data_id: {op_name: data}} self._input_buf = {} # {data_id: {op_name: data}}
self._consumer_cursors = {} # {op_name: idx} self._consumer_cursors = {} # {op_name: idx}
...@@ -514,14 +514,14 @@ class ThreadChannel(Queue.Queue): ...@@ -514,14 +514,14 @@ class ThreadChannel(Queue.Queue):
name: None name: None
for name in self._producers for name in self._producers
} }
self.pushed_producer_count[data_id] = 0 self._pushed_producer_count[data_id] = 0
self._input_buf[data_id][op_name] = channeldata self._input_buf[data_id][op_name] = channeldata
if self.pushed_producer_count[data_id] + 1 == producer_num: if self._pushed_producer_count[data_id] + 1 == producer_num:
put_data = self._input_buf[data_id] put_data = self._input_buf[data_id]
self._input_buf.pop(data_id) self._input_buf.pop(data_id)
self.pushed_producer_count.pop(data_id) self._pushed_producer_count.pop(data_id)
else: else:
self.pushed_producer_count[data_id] += 1 self._pushed_producer_count[data_id] += 1
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册