提交 89462e9c 编写于 作者: L Li Hongzhang

check disk space before writing and remove unused mode value

上级 c6847992
......@@ -15,6 +15,7 @@
"""Writes events to disk in a logdir."""
import os
import stat
from shutil import disk_usage
from ..._c_expression import EventWriter_
from ._summary_adapter import package_init_event
......@@ -42,9 +43,11 @@ class BaseWriter:
self.init_writer()
return self._writer
def write(self, plugin, mode, data):
def write(self, plugin, data):
"""Write data to file."""
raise NotImplementedError()
if self.writer and disk_usage(self._filepath).free < len(data) * 32:
raise RuntimeError('The disk space may be soon exhausted.')
self.writer.Write(data)
def flush(self):
"""Flush the writer."""
......@@ -64,16 +67,16 @@ class SummaryWriter(BaseWriter):
"""Write some metadata etc."""
self.writer.Write(package_init_event().SerializeToString())
def write(self, plugin, mode, data):
def write(self, plugin, data):
"""Write data to file."""
if plugin in ('summary', 'graph'):
self.writer.Write(data)
super().write(plugin, data)
class LineageWriter(BaseWriter):
"""LineageWriter for write lineage."""
def write(self, plugin, mode, data):
def write(self, plugin, data):
"""Write data to file."""
if plugin in ('dataset_graph', 'train_lineage', 'eval_lineage', 'custom_lineage_data'):
self.writer.Write(data)
super().write(plugin, data)
......@@ -18,6 +18,8 @@ import time
from collections import deque
from multiprocessing import Pool, Process, Queue, cpu_count
import mindspore.log as logger
from ._lineage_adapter import serialize_to_lineage_event
from ._summary_adapter import package_graph_event, package_summary_event
from ._summary_writer import LineageWriter, SummaryWriter
......@@ -25,20 +27,18 @@ from ._summary_writer import LineageWriter, SummaryWriter
def _pack_data(datadict, wall_time):
"""Pack data according to which plugin."""
result = []
summaries, step, mode = [], None, None
result, summaries, step = [], [], None
for plugin, datalist in datadict.items():
for data in datalist:
if plugin == 'graph':
result.append([plugin, data.get('mode'), package_graph_event(data.get('value')).SerializeToString()])
result.append([plugin, package_graph_event(data.get('value')).SerializeToString()])
elif plugin in ('train_lineage', 'eval_lineage', 'custom_lineage_data', 'dataset_graph'):
result.append([plugin, data.get('mode'), serialize_to_lineage_event(plugin, data.get('value'))])
result.append([plugin, serialize_to_lineage_event(plugin, data.get('value'))])
elif plugin in ('scalar', 'tensor', 'histogram', 'image'):
summaries.append({'_type': plugin.title(), 'name': data.get('tag'), 'data': data.get('value')})
step = data.get('step')
mode = data.get('mode')
if summaries:
result.append(['summary', mode, package_summary_event(summaries, step, wall_time).SerializeToString()])
result.append(['summary', package_summary_event(summaries, step, wall_time).SerializeToString()])
return result
......@@ -54,46 +54,65 @@ class WriterPool(Process):
def __init__(self, base_dir, **filedict) -> None:
super().__init__()
self._base_dir, self._filedict = base_dir, filedict
self._queue = Queue(cpu_count() * 2)
self._queue, self._writers_ = Queue(cpu_count() * 2), None
self.start()
def run(self):
writers = self._get_writers()
with Pool(min(cpu_count(), 32)) as pool:
deq = deque()
while True:
while deq and deq[0].ready():
for plugin, mode, data in deq.popleft().get():
for writer in writers:
writer.write(plugin, mode, data)
for plugin, data in deq.popleft().get():
self._write(plugin, data)
if not self._queue.empty():
if not self._queue.empty() and self._writers:
action, data = self._queue.get()
if action == 'WRITE':
deq.append(pool.apply_async(_pack_data, (data, time.time())))
elif action == 'FLUSH':
for writer in writers:
writer.flush()
self._flush()
elif action == 'END':
break
for result in deq:
for plugin, mode, data in result.get():
for writer in writers:
writer.write(plugin, mode, data)
for plugin, data in result.get():
self._write(plugin, data)
for writer in writers:
writer.close()
self._close()
def _get_writers(self):
writers = []
@property
def _writers(self):
"""Get the writers in the subprocess."""
if self._writers_ is not None:
return self._writers_
self._writers_ = []
for plugin, filename in self._filedict.items():
filepath = os.path.join(self._base_dir, filename)
if plugin == 'summary':
writers.append(SummaryWriter(filepath))
self._writers_.append(SummaryWriter(filepath))
elif plugin == 'lineage':
writers.append(LineageWriter(filepath))
return writers
self._writers_.append(LineageWriter(filepath))
return self._writers_
def _write(self, plugin, data):
"""Write the data in the subprocess."""
for writer in self._writers[:]:
try:
writer.write(plugin, data)
except RuntimeError:
logger.warning(f'The disk space may be soon exhausted by this {type(writer).__name__}, '
'so the writer will be closed and not for further writing.')
self._writers.remove(writer)
writer.close()
def _flush(self):
"""Flush the writers in the subprocess."""
for writer in self._writers:
writer.flush()
def _close(self):
"""Close the writers in the subprocess."""
for writer in self._writers:
writer.close()
def write(self, data) -> None:
"""
......
......@@ -218,14 +218,14 @@ class SummaryRecord:
if name in {item['tag'] for item in self._data_pool[plugin]}:
entry = repr(f'{name}/{plugin}')
logger.warning(f'{entry} has duplicate values. Only the newest one will be recorded.')
self._data_pool[plugin].append(dict(tag=name, mode=self._mode, value=np_value))
self._data_pool[plugin].append(dict(tag=name, value=np_value))
elif plugin in ('train_lineage', 'eval_lineage', 'dataset_graph', 'custom_lineage_data'):
_check_lineage_value(plugin, value)
self._data_pool[plugin].append(dict(mode=self._mode, value=value.SerializeToString()))
self._data_pool[plugin].append(dict(value=value.SerializeToString()))
elif plugin == 'graph':
package_graph_event(value)
self._data_pool[plugin].append(dict(mode=self._mode, value=value))
self._data_pool[plugin].append(dict(value=value))
else:
raise ValueError(f'No such plugin of {repr(plugin)}')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册