提交 9ae239b5 编写于 作者: L luopengting

optimize parsing for out-of-order events

上级 09fb9f16
...@@ -23,9 +23,9 @@ from mindinsight.conf import settings ...@@ -23,9 +23,9 @@ from mindinsight.conf import settings
# Type of the tensor event from external component # Type of the tensor event from external component
_Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value']) _Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value', 'filename'])
TensorEvent = collections.namedtuple( TensorEvent = collections.namedtuple(
'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value']) 'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value', 'filename'])
# config for `EventsData` # config for `EventsData`
_DEFAULT_STEP_SIZES_PER_TAG = settings.DEFAULT_STEP_SIZES_PER_TAG _DEFAULT_STEP_SIZES_PER_TAG = settings.DEFAULT_STEP_SIZES_PER_TAG
...@@ -99,10 +99,11 @@ class EventsData: ...@@ -99,10 +99,11 @@ class EventsData:
tensor = _Tensor(wall_time=tensor_event.wall_time, tensor = _Tensor(wall_time=tensor_event.wall_time,
step=tensor_event.step, step=tensor_event.step,
value=tensor_event.value) value=tensor_event.value,
filename=tensor_event.filename)
if self._is_out_of_order_step(tensor_event.step, tensor_event.tag): if self._is_out_of_order_step(tensor_event.step, tensor_event.tag):
self.purge_reservoir_data(tensor_event.step, self._reservoir_by_tag[tag]) self.purge_reservoir_data(tensor_event.filename, tensor_event.step, self._reservoir_by_tag[tag])
self._reservoir_by_tag[tag].add_sample(tensor) self._reservoir_by_tag[tag].add_sample(tensor)
...@@ -176,7 +177,7 @@ class EventsData: ...@@ -176,7 +177,7 @@ class EventsData:
return False return False
@staticmethod @staticmethod
def purge_reservoir_data(start_step, tensor_reservoir): def purge_reservoir_data(filename, start_step, tensor_reservoir):
""" """
Purge all tensor event that are out-of-order step after the given start step. Purge all tensor event that are out-of-order step after the given start step.
...@@ -188,7 +189,8 @@ class EventsData: ...@@ -188,7 +189,8 @@ class EventsData:
Returns: Returns:
int, the number of items removed. int, the number of items removed.
""" """
cnt_out_of_order = tensor_reservoir.remove_sample(lambda x: x.step < start_step) cnt_out_of_order = tensor_reservoir.remove_sample(
lambda x: x.step < start_step or (x.step > start_step and x.filename == filename))
return cnt_out_of_order return cnt_out_of_order
......
...@@ -223,7 +223,8 @@ class MSDataLoader: ...@@ -223,7 +223,8 @@ class MSDataLoader:
step=event.step, step=event.step,
tag=tag, tag=tag,
plugin_name=PluginNameEnum.SCALAR.value, plugin_name=PluginNameEnum.SCALAR.value,
value=value.scalar_value) value=value.scalar_value,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event) self._events_data.add_tensor_event(tensor_event)
if value.HasField('image'): if value.HasField('image'):
...@@ -232,7 +233,8 @@ class MSDataLoader: ...@@ -232,7 +233,8 @@ class MSDataLoader:
step=event.step, step=event.step,
tag=tag, tag=tag,
plugin_name=PluginNameEnum.IMAGE.value, plugin_name=PluginNameEnum.IMAGE.value,
value=value.image) value=value.image,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event) self._events_data.add_tensor_event(tensor_event)
if value.HasField('histogram'): if value.HasField('histogram'):
...@@ -242,7 +244,8 @@ class MSDataLoader: ...@@ -242,7 +244,8 @@ class MSDataLoader:
step=event.step, step=event.step,
tag=tag, tag=tag,
plugin_name=PluginNameEnum.HISTOGRAM.value, plugin_name=PluginNameEnum.HISTOGRAM.value,
value=histogram_msg) value=histogram_msg,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event) self._events_data.add_tensor_event(tensor_event)
if event.HasField('graph_def'): if event.HasField('graph_def'):
...@@ -253,7 +256,8 @@ class MSDataLoader: ...@@ -253,7 +256,8 @@ class MSDataLoader:
step=event.step, step=event.step,
tag=self._latest_summary_filename, tag=self._latest_summary_filename,
plugin_name=PluginNameEnum.GRAPH.value, plugin_name=PluginNameEnum.GRAPH.value,
value=graph) value=graph,
filename=self._latest_summary_filename)
try: try:
graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value)
...@@ -436,7 +440,8 @@ class _PbParser: ...@@ -436,7 +440,8 @@ class _PbParser:
step=0, step=0,
tag=filename, tag=filename,
plugin_name=PluginNameEnum.GRAPH.value, plugin_name=PluginNameEnum.GRAPH.value,
value=graph) value=graph,
filename=filename)
logger.info("Build graph success, file path: %s.", file_path) logger.info("Build graph success, file path: %s.", file_path)
return tensor_event return tensor_event
...@@ -23,6 +23,24 @@ from mindinsight.utils.exceptions import ParamValueError ...@@ -23,6 +23,24 @@ from mindinsight.utils.exceptions import ParamValueError
from mindinsight.datavisual.utils.utils import calc_histogram_bins from mindinsight.datavisual.utils.utils import calc_histogram_bins
def binary_search(samples, target):
"""Binary search target in samples."""
left = 0
right = len(samples) - 1
while left <= right:
mid = (left + right) // 2
if target < samples[mid].step:
right = mid - 1
elif target > samples[mid].step:
left = mid + 1
else:
return mid
# if right is -1, it is less than the first one.
# if list is [1, 2, 4], target is 3, right will be 1, so wo will insert by 2.
return right + 1
class Reservoir: class Reservoir:
""" """
A container based on Reservoir Sampling algorithm. A container based on Reservoir Sampling algorithm.
...@@ -68,18 +86,28 @@ class Reservoir: ...@@ -68,18 +86,28 @@ class Reservoir:
""" """
with self._mutex: with self._mutex:
if len(self._samples) < self._samples_max_size or self._samples_max_size == 0: if len(self._samples) < self._samples_max_size or self._samples_max_size == 0:
self._samples.append(sample) self._add_sample(sample)
else: else:
# Use the Reservoir Sampling algorithm to replace the old sample. # Use the Reservoir Sampling algorithm to replace the old sample.
rand_int = self._sample_selector.randint( rand_int = self._sample_selector.randint(0, self._sample_counter)
0, self._sample_counter)
if rand_int < self._samples_max_size: if rand_int < self._samples_max_size:
self._samples.pop(rand_int) self._samples.pop(rand_int)
self._samples.append(sample)
else: else:
self._samples[-1] = sample self._samples = self._samples[:-1]
self._add_sample(sample)
self._sample_counter += 1 self._sample_counter += 1
def _add_sample(self, sample):
"""Search the index and add sample."""
if not self._samples or sample.step > self._samples[-1].step:
self._samples.append(sample)
return
index = binary_search(self._samples, sample.step)
if index == len(self._samples):
self._samples.append(sample)
else:
self._samples.insert(index, sample)
def remove_sample(self, filter_fun): def remove_sample(self, filter_fun):
""" """
Remove the samples from Reservoir that do not meet the filter criteria. Remove the samples from Reservoir that do not meet the filter criteria.
......
...@@ -36,9 +36,9 @@ class MockReservoir: ...@@ -36,9 +36,9 @@ class MockReservoir:
def __init__(self, size): def __init__(self, size):
self.size = size self.size = size
self._samples = [ self._samples = [
_Tensor('wall_time1', 1, 'value1'), _Tensor('wall_time1', 1, 'value1', 'filename1'),
_Tensor('wall_time2', 2, 'value2'), _Tensor('wall_time2', 2, 'value2', 'filename2'),
_Tensor('wall_time3', 3, 'value3') _Tensor('wall_time3', 3, 'value3', 'filename3')
] ]
def samples(self): def samples(self):
...@@ -107,7 +107,8 @@ class TestEventsData: ...@@ -107,7 +107,8 @@ class TestEventsData:
"""Test add_tensor_event success.""" """Test add_tensor_event success."""
ev_data = self.get_ev_data() ev_data = self.get_ev_data()
t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', plugin_name='plugin_name1', value='value1') t_event = TensorEvent(wall_time=1, step=4, tag='new_tag',
plugin_name='plugin_name1', value='value1', filename='filename')
ev_data.add_tensor_event(t_event) ev_data.add_tensor_event(t_event)
assert 'tag0' not in ev_data._tags assert 'tag0' not in ev_data._tags
...@@ -116,4 +117,54 @@ class TestEventsData: ...@@ -116,4 +117,54 @@ class TestEventsData:
assert 'tag0' not in ev_data._reservoir_by_tag assert 'tag0' not in ev_data._reservoir_by_tag
assert 'new_tag' in ev_data._tags_by_plugin['plugin_name1'] assert 'new_tag' in ev_data._tags_by_plugin['plugin_name1']
assert ev_data._reservoir_by_tag['new_tag'].samples()[-1] == _Tensor(t_event.wall_time, t_event.step, assert ev_data._reservoir_by_tag['new_tag'].samples()[-1] == _Tensor(t_event.wall_time, t_event.step,
t_event.value) t_event.value, 'filename')
def test_add_tensor_event_out_of_order(self):
"""Test add_tensor_event success for out_of_order summaries."""
wall_time = 1
value = '1'
tag = 'tag'
plugin_name = 'scalar'
file1 = 'file1'
ev_data = EventsData()
steps = [i for i in range(2, 10)]
for step in steps:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file1)
ev_data.add_tensor_event(t_event)
t_event = TensorEvent(wall_time=1, step=1, tag=tag,
plugin_name=plugin_name, value=value, filename=file1)
ev_data.add_tensor_event(t_event)
# Current steps should be: [1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(ev_data._reservoir_by_tag[tag].samples()) == len(steps) + 1
file2 = 'file2'
new_steps_1 = [5, 10]
for step in new_steps_1:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file2)
ev_data.add_tensor_event(t_event)
assert ev_data._reservoir_by_tag[tag].samples()[-1] == _Tensor(wall_time, step, value, file2)
# Current steps should be: [1, 2, 3, 4, 5, 10]
steps = [1, 2, 3, 4, 5, 10]
samples = ev_data._reservoir_by_tag[tag].samples()
for step, sample in zip(steps, samples):
filename = file1 if sample.step < 5 else file2
assert sample == _Tensor(wall_time, step, value, filename)
new_steps_2 = [7, 11, 3]
for step in new_steps_2:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file2)
ev_data.add_tensor_event(t_event)
# Current steps should be: [1, 2, 3, 5, 7, 10, 11], file2: [3, 5, 7, 10, 11]
steps = [1, 2, 3, 5, 7, 10, 11]
new_steps_2.extend(new_steps_1)
samples = ev_data._reservoir_by_tag[tag].samples()
for step, sample in zip(steps, samples):
filename = file2 if sample.step in new_steps_2 else file1
assert sample == _Tensor(wall_time, step, value, filename)
...@@ -27,10 +27,14 @@ class TestHistogramReservoir: ...@@ -27,10 +27,14 @@ class TestHistogramReservoir:
sample1.value.count = 1 sample1.value.count = 1
sample1.value.max = 102 sample1.value.max = 102
sample1.value.min = 101 sample1.value.min = 101
sample1.step = 2
sample1.filename = 'filename'
sample2 = mock.MagicMock() sample2 = mock.MagicMock()
sample2.value.count = 2 sample2.value.count = 2
sample2.value.max = 102 sample2.value.max = 102
sample2.value.min = 101 sample2.value.min = 101
sample2.step = 1
sample2.filename = 'filename'
my_reservoir.add_sample(sample1) my_reservoir.add_sample(sample1)
my_reservoir.add_sample(sample2) my_reservoir.add_sample(sample2)
samples = my_reservoir.samples() samples = my_reservoir.samples()
......
...@@ -216,9 +216,8 @@ class TestImagesProcessor: ...@@ -216,9 +216,8 @@ class TestImagesProcessor:
""" """
Test removing sample in reservoir. Test removing sample in reservoir.
If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15], If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15] in one summary,
and then [3, 5, 7, 9] will be deleted. Results will be [1, 2, 3, 4, 5, 7, 9, 15].
Results will be [1, 2, 3, 4, 15].
""" """
test_tag_name = self._complete_tag_name test_tag_name = self._complete_tag_name
...@@ -237,5 +236,4 @@ class TestImagesProcessor: ...@@ -237,5 +236,4 @@ class TestImagesProcessor:
except ImageNotExistError: except ImageNotExistError:
not_found_step_list.append(test_step) not_found_step_list.append(test_step)
assert current_step_list == [1, 2, 3, 4, 15] assert current_step_list == [1, 2, 3, 4, 5, 7, 9, 15]
assert not_found_step_list == [5, 7, 9]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册