提交 932679d5 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!3058 Check disk space before writing and eliminate the unneeded `mode` value

Merge pull request !3058 from LiHongzhang/check_disk_space
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
"""Writes events to disk in a logdir.""" """Writes events to disk in a logdir."""
import os import os
import stat import stat
from shutil import disk_usage
from ..._c_expression import EventWriter_ from ..._c_expression import EventWriter_
from ._summary_adapter import package_init_event from ._summary_adapter import package_init_event
...@@ -42,9 +43,11 @@ class BaseWriter: ...@@ -42,9 +43,11 @@ class BaseWriter:
self.init_writer() self.init_writer()
return self._writer return self._writer
def write(self, plugin, mode, data): def write(self, plugin, data):
"""Write data to file.""" """Write data to file."""
raise NotImplementedError() if self.writer and disk_usage(self._filepath).free < len(data) * 32:
raise RuntimeError('The disk space may be soon exhausted.')
self.writer.Write(data)
def flush(self): def flush(self):
"""Flush the writer.""" """Flush the writer."""
...@@ -64,16 +67,16 @@ class SummaryWriter(BaseWriter): ...@@ -64,16 +67,16 @@ class SummaryWriter(BaseWriter):
"""Write some metadata etc.""" """Write some metadata etc."""
self.writer.Write(package_init_event().SerializeToString()) self.writer.Write(package_init_event().SerializeToString())
def write(self, plugin, mode, data): def write(self, plugin, data):
"""Write data to file.""" """Write data to file."""
if plugin in ('summary', 'graph'): if plugin in ('summary', 'graph'):
self.writer.Write(data) super().write(plugin, data)
class LineageWriter(BaseWriter): class LineageWriter(BaseWriter):
"""LineageWriter for write lineage.""" """LineageWriter for write lineage."""
def write(self, plugin, mode, data): def write(self, plugin, data):
"""Write data to file.""" """Write data to file."""
if plugin in ('dataset_graph', 'train_lineage', 'eval_lineage', 'custom_lineage_data'): if plugin in ('dataset_graph', 'train_lineage', 'eval_lineage', 'custom_lineage_data'):
self.writer.Write(data) super().write(plugin, data)
...@@ -18,6 +18,8 @@ import time ...@@ -18,6 +18,8 @@ import time
from collections import deque from collections import deque
from multiprocessing import Pool, Process, Queue, cpu_count from multiprocessing import Pool, Process, Queue, cpu_count
import mindspore.log as logger
from ._lineage_adapter import serialize_to_lineage_event from ._lineage_adapter import serialize_to_lineage_event
from ._summary_adapter import package_graph_event, package_summary_event from ._summary_adapter import package_graph_event, package_summary_event
from ._summary_writer import LineageWriter, SummaryWriter from ._summary_writer import LineageWriter, SummaryWriter
...@@ -25,20 +27,18 @@ from ._summary_writer import LineageWriter, SummaryWriter ...@@ -25,20 +27,18 @@ from ._summary_writer import LineageWriter, SummaryWriter
def _pack_data(datadict, wall_time): def _pack_data(datadict, wall_time):
"""Pack data according to which plugin.""" """Pack data according to which plugin."""
result = [] result, summaries, step = [], [], None
summaries, step, mode = [], None, None
for plugin, datalist in datadict.items(): for plugin, datalist in datadict.items():
for data in datalist: for data in datalist:
if plugin == 'graph': if plugin == 'graph':
result.append([plugin, data.get('mode'), package_graph_event(data.get('value')).SerializeToString()]) result.append([plugin, package_graph_event(data.get('value')).SerializeToString()])
elif plugin in ('train_lineage', 'eval_lineage', 'custom_lineage_data', 'dataset_graph'): elif plugin in ('train_lineage', 'eval_lineage', 'custom_lineage_data', 'dataset_graph'):
result.append([plugin, data.get('mode'), serialize_to_lineage_event(plugin, data.get('value'))]) result.append([plugin, serialize_to_lineage_event(plugin, data.get('value'))])
elif plugin in ('scalar', 'tensor', 'histogram', 'image'): elif plugin in ('scalar', 'tensor', 'histogram', 'image'):
summaries.append({'_type': plugin.title(), 'name': data.get('tag'), 'data': data.get('value')}) summaries.append({'_type': plugin.title(), 'name': data.get('tag'), 'data': data.get('value')})
step = data.get('step') step = data.get('step')
mode = data.get('mode')
if summaries: if summaries:
result.append(['summary', mode, package_summary_event(summaries, step, wall_time).SerializeToString()]) result.append(['summary', package_summary_event(summaries, step, wall_time).SerializeToString()])
return result return result
...@@ -54,46 +54,65 @@ class WriterPool(Process): ...@@ -54,46 +54,65 @@ class WriterPool(Process):
def __init__(self, base_dir, **filedict) -> None: def __init__(self, base_dir, **filedict) -> None:
super().__init__() super().__init__()
self._base_dir, self._filedict = base_dir, filedict self._base_dir, self._filedict = base_dir, filedict
self._queue = Queue(cpu_count() * 2) self._queue, self._writers_ = Queue(cpu_count() * 2), None
self.start() self.start()
def run(self): def run(self):
writers = self._get_writers()
with Pool(min(cpu_count(), 32)) as pool: with Pool(min(cpu_count(), 32)) as pool:
deq = deque() deq = deque()
while True: while True:
while deq and deq[0].ready(): while deq and deq[0].ready():
for plugin, mode, data in deq.popleft().get(): for plugin, data in deq.popleft().get():
for writer in writers: self._write(plugin, data)
writer.write(plugin, mode, data)
if not self._queue.empty(): if not self._queue.empty() and self._writers:
action, data = self._queue.get() action, data = self._queue.get()
if action == 'WRITE': if action == 'WRITE':
deq.append(pool.apply_async(_pack_data, (data, time.time()))) deq.append(pool.apply_async(_pack_data, (data, time.time())))
elif action == 'FLUSH': elif action == 'FLUSH':
for writer in writers: self._flush()
writer.flush()
elif action == 'END': elif action == 'END':
break break
for result in deq: for result in deq:
for plugin, mode, data in result.get(): for plugin, data in result.get():
for writer in writers: self._write(plugin, data)
writer.write(plugin, mode, data)
for writer in writers: self._close()
writer.close()
def _get_writers(self): @property
writers = [] def _writers(self):
"""Get the writers in the subprocess."""
if self._writers_ is not None:
return self._writers_
self._writers_ = []
for plugin, filename in self._filedict.items(): for plugin, filename in self._filedict.items():
filepath = os.path.join(self._base_dir, filename) filepath = os.path.join(self._base_dir, filename)
if plugin == 'summary': if plugin == 'summary':
writers.append(SummaryWriter(filepath)) self._writers_.append(SummaryWriter(filepath))
elif plugin == 'lineage': elif plugin == 'lineage':
writers.append(LineageWriter(filepath)) self._writers_.append(LineageWriter(filepath))
return writers return self._writers_
def _write(self, plugin, data):
"""Write the data in the subprocess."""
for writer in self._writers[:]:
try:
writer.write(plugin, data)
except RuntimeError:
logger.warning(f'The disk space may be soon exhausted by this {type(writer).__name__}, '
'so the writer will be closed and not for further writing.')
self._writers.remove(writer)
writer.close()
def _flush(self):
"""Flush the writers in the subprocess."""
for writer in self._writers:
writer.flush()
def _close(self):
"""Close the writers in the subprocess."""
for writer in self._writers:
writer.close()
def write(self, data) -> None: def write(self, data) -> None:
""" """
......
...@@ -218,14 +218,14 @@ class SummaryRecord: ...@@ -218,14 +218,14 @@ class SummaryRecord:
if name in {item['tag'] for item in self._data_pool[plugin]}: if name in {item['tag'] for item in self._data_pool[plugin]}:
entry = repr(f'{name}/{plugin}') entry = repr(f'{name}/{plugin}')
logger.warning(f'{entry} has duplicate values. Only the newest one will be recorded.') logger.warning(f'{entry} has duplicate values. Only the newest one will be recorded.')
self._data_pool[plugin].append(dict(tag=name, mode=self._mode, value=np_value)) self._data_pool[plugin].append(dict(tag=name, value=np_value))
elif plugin in ('train_lineage', 'eval_lineage', 'dataset_graph', 'custom_lineage_data'): elif plugin in ('train_lineage', 'eval_lineage', 'dataset_graph', 'custom_lineage_data'):
_check_lineage_value(plugin, value) _check_lineage_value(plugin, value)
self._data_pool[plugin].append(dict(mode=self._mode, value=value.SerializeToString())) self._data_pool[plugin].append(dict(value=value.SerializeToString()))
elif plugin == 'graph': elif plugin == 'graph':
package_graph_event(value) package_graph_event(value)
self._data_pool[plugin].append(dict(mode=self._mode, value=value)) self._data_pool[plugin].append(dict(value=value))
else: else:
raise ValueError(f'No such plugin of {repr(plugin)}') raise ValueError(f'No such plugin of {repr(plugin)}')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册