From 89462e9c3b944f942b228867e373ae8188ad5851 Mon Sep 17 00:00:00 2001 From: Li Hongzhang Date: Tue, 14 Jul 2020 15:45:24 +0800 Subject: [PATCH] check disk space before writing and remove unused mode value --- mindspore/train/summary/_summary_writer.py | 15 +++-- mindspore/train/summary/_writer_pool.py | 69 ++++++++++++++-------- mindspore/train/summary/summary_record.py | 6 +- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/mindspore/train/summary/_summary_writer.py b/mindspore/train/summary/_summary_writer.py index 36d020819..a5648fc94 100644 --- a/mindspore/train/summary/_summary_writer.py +++ b/mindspore/train/summary/_summary_writer.py @@ -15,6 +15,7 @@ """Writes events to disk in a logdir.""" import os import stat +from shutil import disk_usage from ..._c_expression import EventWriter_ from ._summary_adapter import package_init_event @@ -42,9 +43,11 @@ class BaseWriter: self.init_writer() return self._writer - def write(self, plugin, mode, data): + def write(self, plugin, data): """Write data to file.""" - raise NotImplementedError() + if self.writer and disk_usage(self._filepath).free < len(data) * 32: + raise RuntimeError('The disk space may be soon exhausted.') + self.writer.Write(data) def flush(self): """Flush the writer.""" @@ -64,16 +67,16 @@ class SummaryWriter(BaseWriter): """Write some metadata etc.""" self.writer.Write(package_init_event().SerializeToString()) - def write(self, plugin, mode, data): + def write(self, plugin, data): """Write data to file.""" if plugin in ('summary', 'graph'): - self.writer.Write(data) + super().write(plugin, data) class LineageWriter(BaseWriter): """LineageWriter for write lineage.""" - def write(self, plugin, mode, data): + def write(self, plugin, data): """Write data to file.""" if plugin in ('dataset_graph', 'train_lineage', 'eval_lineage', 'custom_lineage_data'): - self.writer.Write(data) + super().write(plugin, data) diff --git a/mindspore/train/summary/_writer_pool.py b/mindspore/train/summary/_writer_pool.py index d9cdfd3c8..d0cf998b3 100644 --- a/mindspore/train/summary/_writer_pool.py +++ b/mindspore/train/summary/_writer_pool.py @@ -18,6 +18,8 @@ import time from collections import deque from multiprocessing import Pool, Process, Queue, cpu_count +import mindspore.log as logger + from ._lineage_adapter import serialize_to_lineage_event from ._summary_adapter import package_graph_event, package_summary_event from ._summary_writer import LineageWriter, SummaryWriter @@ -25,20 +27,18 @@ from ._summary_writer import LineageWriter, SummaryWriter def _pack_data(datadict, wall_time): """Pack data according to which plugin.""" - result = [] - summaries, step, mode = [], None, None + result, summaries, step = [], [], None for plugin, datalist in datadict.items(): for data in datalist: if plugin == 'graph': - result.append([plugin, data.get('mode'), package_graph_event(data.get('value')).SerializeToString()]) + result.append([plugin, package_graph_event(data.get('value')).SerializeToString()]) elif plugin in ('train_lineage', 'eval_lineage', 'custom_lineage_data', 'dataset_graph'): - result.append([plugin, data.get('mode'), serialize_to_lineage_event(plugin, data.get('value'))]) + result.append([plugin, serialize_to_lineage_event(plugin, data.get('value'))]) elif plugin in ('scalar', 'tensor', 'histogram', 'image'): summaries.append({'_type': plugin.title(), 'name': data.get('tag'), 'data': data.get('value')}) step = data.get('step') - mode = data.get('mode') if summaries: - result.append(['summary', mode, package_summary_event(summaries, step, wall_time).SerializeToString()]) + result.append(['summary', package_summary_event(summaries, step, wall_time).SerializeToString()]) return result @@ -54,46 +54,65 @@ class WriterPool(Process): def __init__(self, base_dir, **filedict) -> None: super().__init__() self._base_dir, self._filedict = base_dir, filedict - self._queue = Queue(cpu_count() * 2) + self._queue, self._writers_ = Queue(cpu_count() * 2), None self.start() def run(self): - writers = self._get_writers() - with Pool(min(cpu_count(), 32)) as pool: deq = deque() while True: while deq and deq[0].ready(): - for plugin, mode, data in deq.popleft().get(): - for writer in writers: - writer.write(plugin, mode, data) + for plugin, data in deq.popleft().get(): + self._write(plugin, data) - if not self._queue.empty(): + if not self._queue.empty() and self._writers: action, data = self._queue.get() if action == 'WRITE': deq.append(pool.apply_async(_pack_data, (data, time.time()))) elif action == 'FLUSH': - for writer in writers: - writer.flush() + self._flush() elif action == 'END': break for result in deq: - for plugin, mode, data in result.get(): - for writer in writers: - writer.write(plugin, mode, data) + for plugin, data in result.get(): + self._write(plugin, data) - for writer in writers: - writer.close() + self._close() - def _get_writers(self): - writers = [] + @property + def _writers(self): + """Get the writers in the subprocess.""" + if self._writers_ is not None: + return self._writers_ + self._writers_ = [] for plugin, filename in self._filedict.items(): filepath = os.path.join(self._base_dir, filename) if plugin == 'summary': - writers.append(SummaryWriter(filepath)) + self._writers_.append(SummaryWriter(filepath)) elif plugin == 'lineage': - writers.append(LineageWriter(filepath)) - return writers + self._writers_.append(LineageWriter(filepath)) + return self._writers_ + + def _write(self, plugin, data): + """Write the data in the subprocess.""" + for writer in self._writers[:]: + try: + writer.write(plugin, data) + except RuntimeError: + logger.warning(f'The disk space may be soon exhausted by this {type(writer).__name__}, ' + 'so the writer will be closed and not for further writing.') + self._writers.remove(writer) + writer.close() + + def _flush(self): + """Flush the writers in the subprocess.""" + for writer in self._writers: + writer.flush() + + def _close(self): + """Close the writers in the subprocess.""" + for writer in self._writers: + writer.close() def write(self, data) -> None: """ diff --git a/mindspore/train/summary/summary_record.py b/mindspore/train/summary/summary_record.py index 21c8c58d3..2bc605797 100644 --- a/mindspore/train/summary/summary_record.py +++ b/mindspore/train/summary/summary_record.py @@ -218,14 +218,14 @@ class SummaryRecord: if name in {item['tag'] for item in self._data_pool[plugin]}: entry = repr(f'{name}/{plugin}') logger.warning(f'{entry} has duplicate values. Only the newest one will be recorded.') - self._data_pool[plugin].append(dict(tag=name, mode=self._mode, value=np_value)) + self._data_pool[plugin].append(dict(tag=name, value=np_value)) elif plugin in ('train_lineage', 'eval_lineage', 'dataset_graph', 'custom_lineage_data'): _check_lineage_value(plugin, value) - self._data_pool[plugin].append(dict(mode=self._mode, value=value.SerializeToString())) + self._data_pool[plugin].append(dict(value=value.SerializeToString())) elif plugin == 'graph': package_graph_event(value) - self._data_pool[plugin].append(dict(mode=self._mode, value=value)) + self._data_pool[plugin].append(dict(value=value)) else: raise ValueError(f'No such plugin of {repr(plugin)}') -- GitLab