提交 660a8dbf 编写于 作者: L luopengting

optimize parsing for out-of-order events

上级 09fb9f16
......@@ -23,9 +23,9 @@ from mindinsight.conf import settings
# Type of the tensor event from external component
_Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value'])
_Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value', 'filename'])
TensorEvent = collections.namedtuple(
'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value'])
'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value', 'filename'])
# config for `EventsData`
_DEFAULT_STEP_SIZES_PER_TAG = settings.DEFAULT_STEP_SIZES_PER_TAG
......@@ -99,10 +99,11 @@ class EventsData:
tensor = _Tensor(wall_time=tensor_event.wall_time,
step=tensor_event.step,
value=tensor_event.value)
value=tensor_event.value,
filename=tensor_event.filename)
if self._is_out_of_order_step(tensor_event.step, tensor_event.tag):
self.purge_reservoir_data(tensor_event.step, self._reservoir_by_tag[tag])
self.purge_reservoir_data(tensor_event.filename, tensor_event.step, self._reservoir_by_tag[tag])
self._reservoir_by_tag[tag].add_sample(tensor)
......@@ -176,7 +177,7 @@ class EventsData:
return False
@staticmethod
def purge_reservoir_data(start_step, tensor_reservoir):
def purge_reservoir_data(filename, start_step, tensor_reservoir):
"""
Purge all tensor event that are out-of-order step after the given start step.
......@@ -188,7 +189,8 @@ class EventsData:
Returns:
int, the number of items removed.
"""
cnt_out_of_order = tensor_reservoir.remove_sample(lambda x: x.step < start_step)
cnt_out_of_order = tensor_reservoir.remove_sample(
lambda x: x.step < start_step or (x.step > start_step and x.filename == filename))
return cnt_out_of_order
......
......@@ -223,7 +223,8 @@ class MSDataLoader:
step=event.step,
tag=tag,
plugin_name=PluginNameEnum.SCALAR.value,
value=value.scalar_value)
value=value.scalar_value,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event)
if value.HasField('image'):
......@@ -232,7 +233,8 @@ class MSDataLoader:
step=event.step,
tag=tag,
plugin_name=PluginNameEnum.IMAGE.value,
value=value.image)
value=value.image,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event)
if value.HasField('histogram'):
......@@ -242,7 +244,8 @@ class MSDataLoader:
step=event.step,
tag=tag,
plugin_name=PluginNameEnum.HISTOGRAM.value,
value=histogram_msg)
value=histogram_msg,
filename=self._latest_summary_filename)
self._events_data.add_tensor_event(tensor_event)
if event.HasField('graph_def'):
......@@ -253,7 +256,8 @@ class MSDataLoader:
step=event.step,
tag=self._latest_summary_filename,
plugin_name=PluginNameEnum.GRAPH.value,
value=graph)
value=graph,
filename=self._latest_summary_filename)
try:
graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value)
......@@ -436,7 +440,8 @@ class _PbParser:
step=0,
tag=filename,
plugin_name=PluginNameEnum.GRAPH.value,
value=graph)
value=graph,
filename=filename)
logger.info("Build graph success, file path: %s.", file_path)
return tensor_event
......@@ -23,6 +23,24 @@ from mindinsight.utils.exceptions import ParamValueError
from mindinsight.datavisual.utils.utils import calc_histogram_bins
def binary_search(samples, target):
"""Binary search target in samples."""
left = 0
right = len(samples) - 1
while left <= right:
mid = (left + right) // 2
if target < samples[mid].step:
right = mid - 1
elif target > samples[mid].step:
left = mid + 1
else:
return mid
# if right is -1, it is less than the first one.
# if list is [1, 2, 4], target is 3, right will be 1, so wo will insert by 2.
return right + 1
class Reservoir:
"""
A container based on Reservoir Sampling algorithm.
......@@ -68,18 +86,28 @@ class Reservoir:
"""
with self._mutex:
if len(self._samples) < self._samples_max_size or self._samples_max_size == 0:
self._samples.append(sample)
self._add_sample(sample)
else:
# Use the Reservoir Sampling algorithm to replace the old sample.
rand_int = self._sample_selector.randint(
0, self._sample_counter)
rand_int = self._sample_selector.randint(0, self._sample_counter)
if rand_int < self._samples_max_size:
self._samples.pop(rand_int)
self._samples.append(sample)
else:
self._samples[-1] = sample
self._samples = self._samples[:-1]
self._add_sample(sample)
self._sample_counter += 1
def _add_sample(self, sample):
"""Search the index and add sample."""
if not self._samples or sample.step > self._samples[-1].step:
self._samples.append(sample)
return
index = binary_search(self._samples, sample.step)
if index == len(self._samples):
self._samples.append(sample)
else:
self._samples.insert(index, sample)
def remove_sample(self, filter_fun):
"""
Remove the samples from Reservoir that do not meet the filter criteria.
......
......@@ -36,9 +36,9 @@ class MockReservoir:
def __init__(self, size):
self.size = size
self._samples = [
_Tensor('wall_time1', 1, 'value1'),
_Tensor('wall_time2', 2, 'value2'),
_Tensor('wall_time3', 3, 'value3')
_Tensor('wall_time1', 1, 'value1', 'filename1'),
_Tensor('wall_time2', 2, 'value2', 'filename2'),
_Tensor('wall_time3', 3, 'value3', 'filename3')
]
def samples(self):
......@@ -107,7 +107,8 @@ class TestEventsData:
"""Test add_tensor_event success."""
ev_data = self.get_ev_data()
t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', plugin_name='plugin_name1', value='value1')
t_event = TensorEvent(wall_time=1, step=4, tag='new_tag',
plugin_name='plugin_name1', value='value1', filename='filename')
ev_data.add_tensor_event(t_event)
assert 'tag0' not in ev_data._tags
......@@ -116,4 +117,54 @@ class TestEventsData:
assert 'tag0' not in ev_data._reservoir_by_tag
assert 'new_tag' in ev_data._tags_by_plugin['plugin_name1']
assert ev_data._reservoir_by_tag['new_tag'].samples()[-1] == _Tensor(t_event.wall_time, t_event.step,
t_event.value)
t_event.value, 'filename')
def test_add_tensor_event_out_of_order(self):
"""Test add_tensor_event success for out_of_order summaries."""
wall_time = 1
value = '1'
tag = 'tag'
plugin_name = 'scalar'
file1 = 'file1'
ev_data = EventsData()
steps = [i for i in range(2, 10)]
for step in steps:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file1)
ev_data.add_tensor_event(t_event)
t_event = TensorEvent(wall_time=1, step=1, tag=tag,
plugin_name=plugin_name, value=value, filename=file1)
ev_data.add_tensor_event(t_event)
# Current steps should be: [1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(ev_data._reservoir_by_tag[tag].samples()) == len(steps) + 1
file2 = 'file2'
new_steps_1 = [5, 10]
for step in new_steps_1:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file2)
ev_data.add_tensor_event(t_event)
assert ev_data._reservoir_by_tag[tag].samples()[-1] == _Tensor(wall_time, step, value, file2)
# Current steps should be: [1, 2, 3, 4, 5, 10]
steps = [1, 2, 3, 4, 5, 10]
samples = ev_data._reservoir_by_tag[tag].samples()
for i in range(len(samples)):
filename = file1 if samples[i].step < 5 else file2
assert samples[i] == _Tensor(wall_time, steps[i], value, filename)
new_steps_2 = [7, 11, 3]
for step in new_steps_2:
t_event = TensorEvent(wall_time=1, step=step, tag=tag,
plugin_name=plugin_name, value=value, filename=file2)
ev_data.add_tensor_event(t_event)
# Current steps should be: [1, 2, 3, 5, 7, 10, 11], file2: [3, 5, 7, 10, 11]
steps = [1, 2, 3, 5, 7, 10, 11]
new_steps_2.extend(new_steps_1)
samples = ev_data._reservoir_by_tag[tag].samples()
for i in range(len(samples)):
filename = file2 if samples[i].step in new_steps_2 else file1
assert samples[i] == _Tensor(wall_time, steps[i], value, filename)
......@@ -27,10 +27,14 @@ class TestHistogramReservoir:
sample1.value.count = 1
sample1.value.max = 102
sample1.value.min = 101
sample1.step = 2
sample1.filename = 'filename'
sample2 = mock.MagicMock()
sample2.value.count = 2
sample2.value.max = 102
sample2.value.min = 101
sample2.step = 1
sample2.filename = 'filename'
my_reservoir.add_sample(sample1)
my_reservoir.add_sample(sample2)
samples = my_reservoir.samples()
......
......@@ -216,9 +216,8 @@ class TestImagesProcessor:
"""
Test removing sample in reservoir.
If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15],
and then [3, 5, 7, 9] will be deleted.
Results will be [1, 2, 3, 4, 15].
If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15] in one summary,
Results will be [1, 2, 3, 4, 5, 7, 9, 15].
"""
test_tag_name = self._complete_tag_name
......@@ -237,5 +236,4 @@ class TestImagesProcessor:
except ImageNotExistError:
not_found_step_list.append(test_step)
assert current_step_list == [1, 2, 3, 4, 15]
assert not_found_step_list == [5, 7, 9]
assert current_step_list == [1, 2, 3, 4, 5, 7, 9, 15]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册