diff --git a/mindinsight/datavisual/data_transform/events_data.py b/mindinsight/datavisual/data_transform/events_data.py index f6be42f2612808ea229396452130d25ff798e77c..5c4797a934d42f8b05f9b6d34d4aa786d896f1fe 100644 --- a/mindinsight/datavisual/data_transform/events_data.py +++ b/mindinsight/datavisual/data_transform/events_data.py @@ -23,9 +23,9 @@ from mindinsight.conf import settings # Type of the tensor event from external component -_Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value']) +_Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value', 'filename']) TensorEvent = collections.namedtuple( - 'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value']) + 'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value', 'filename']) # config for `EventsData` _DEFAULT_STEP_SIZES_PER_TAG = settings.DEFAULT_STEP_SIZES_PER_TAG @@ -99,10 +99,11 @@ class EventsData: tensor = _Tensor(wall_time=tensor_event.wall_time, step=tensor_event.step, - value=tensor_event.value) + value=tensor_event.value, + filename=tensor_event.filename) if self._is_out_of_order_step(tensor_event.step, tensor_event.tag): - self.purge_reservoir_data(tensor_event.step, self._reservoir_by_tag[tag]) + self.purge_reservoir_data(tensor_event.filename, tensor_event.step, self._reservoir_by_tag[tag]) self._reservoir_by_tag[tag].add_sample(tensor) @@ -176,7 +177,7 @@ class EventsData: return False @staticmethod - def purge_reservoir_data(start_step, tensor_reservoir): + def purge_reservoir_data(filename, start_step, tensor_reservoir): """ Purge all tensor event that are out-of-order step after the given start step. @@ -188,7 +189,8 @@ class EventsData: Returns: int, the number of items removed. """ - cnt_out_of_order = tensor_reservoir.remove_sample(lambda x: x.step < start_step) + cnt_out_of_order = tensor_reservoir.remove_sample( + lambda x: x.step < start_step or (x.step > start_step and x.filename == filename)) return cnt_out_of_order diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index c041357e09ab07881ec0443df4a6092119e15488..4ba246021aa6fa603dc4b906a34a1fc3a0e6b85b 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -223,7 +223,8 @@ class MSDataLoader: step=event.step, tag=tag, plugin_name=PluginNameEnum.SCALAR.value, - value=value.scalar_value) + value=value.scalar_value, + filename=self._latest_summary_filename) self._events_data.add_tensor_event(tensor_event) if value.HasField('image'): @@ -232,7 +233,8 @@ class MSDataLoader: step=event.step, tag=tag, plugin_name=PluginNameEnum.IMAGE.value, - value=value.image) + value=value.image, + filename=self._latest_summary_filename) self._events_data.add_tensor_event(tensor_event) if value.HasField('histogram'): @@ -242,7 +244,8 @@ class MSDataLoader: step=event.step, tag=tag, plugin_name=PluginNameEnum.HISTOGRAM.value, - value=histogram_msg) + value=histogram_msg, + filename=self._latest_summary_filename) self._events_data.add_tensor_event(tensor_event) if event.HasField('graph_def'): @@ -253,7 +256,8 @@ class MSDataLoader: step=event.step, tag=self._latest_summary_filename, plugin_name=PluginNameEnum.GRAPH.value, - value=graph) + value=graph, + filename=self._latest_summary_filename) try: graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) @@ -436,7 +440,8 @@ class _PbParser: step=0, tag=filename, plugin_name=PluginNameEnum.GRAPH.value, - value=graph) + value=graph, + filename=filename) logger.info("Build graph success, file path: %s.", file_path) return tensor_event diff --git a/mindinsight/datavisual/data_transform/reservoir.py b/mindinsight/datavisual/data_transform/reservoir.py index d415ed42f8de689df64a49cb48f8c85b76936108..5070092a911ae98031e2615382d44e841fe2b0f1 100644 --- a/mindinsight/datavisual/data_transform/reservoir.py +++ b/mindinsight/datavisual/data_transform/reservoir.py @@ -23,6 +23,24 @@ from mindinsight.utils.exceptions import ParamValueError from mindinsight.datavisual.utils.utils import calc_histogram_bins +def binary_search(samples, target): + """Binary search target in samples.""" + left = 0 + right = len(samples) - 1 + while left <= right: + mid = (left + right) // 2 + if target < samples[mid].step: + right = mid - 1 + elif target > samples[mid].step: + left = mid + 1 + else: + return mid + + # if right is -1, it is less than the first one. + # if list is [1, 2, 4], target is 3, right will be 1, so wo will insert by 2. + return right + 1 + + class Reservoir: """ A container based on Reservoir Sampling algorithm. @@ -68,18 +86,28 @@ class Reservoir: """ with self._mutex: if len(self._samples) < self._samples_max_size or self._samples_max_size == 0: - self._samples.append(sample) + self._add_sample(sample) else: # Use the Reservoir Sampling algorithm to replace the old sample. - rand_int = self._sample_selector.randint( - 0, self._sample_counter) + rand_int = self._sample_selector.randint(0, self._sample_counter) if rand_int < self._samples_max_size: self._samples.pop(rand_int) - self._samples.append(sample) else: - self._samples[-1] = sample + self._samples = self._samples[:-1] + self._add_sample(sample) self._sample_counter += 1 + def _add_sample(self, sample): + """Search the index and add sample.""" + if not self._samples or sample.step > self._samples[-1].step: + self._samples.append(sample) + return + index = binary_search(self._samples, sample.step) + if index == len(self._samples): + self._samples.append(sample) + else: + self._samples.insert(index, sample) + def remove_sample(self, filter_fun): """ Remove the samples from Reservoir that do not meet the filter criteria. diff --git a/tests/ut/datavisual/data_transform/test_events_data.py b/tests/ut/datavisual/data_transform/test_events_data.py index cd3167dadf8086c1b16de887fd20fff35f518e1e..73983612df473b8fd3780ed4e6a5d96d7b38620f 100644 --- a/tests/ut/datavisual/data_transform/test_events_data.py +++ b/tests/ut/datavisual/data_transform/test_events_data.py @@ -36,9 +36,9 @@ class MockReservoir: def __init__(self, size): self.size = size self._samples = [ - _Tensor('wall_time1', 1, 'value1'), - _Tensor('wall_time2', 2, 'value2'), - _Tensor('wall_time3', 3, 'value3') + _Tensor('wall_time1', 1, 'value1', 'filename1'), + _Tensor('wall_time2', 2, 'value2', 'filename2'), + _Tensor('wall_time3', 3, 'value3', 'filename3') ] def samples(self): @@ -107,7 +107,8 @@ class TestEventsData: """Test add_tensor_event success.""" ev_data = self.get_ev_data() - t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', plugin_name='plugin_name1', value='value1') + t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', + plugin_name='plugin_name1', value='value1', filename='filename') ev_data.add_tensor_event(t_event) assert 'tag0' not in ev_data._tags @@ -116,4 +117,54 @@ class TestEventsData: assert 'tag0' not in ev_data._reservoir_by_tag assert 'new_tag' in ev_data._tags_by_plugin['plugin_name1'] assert ev_data._reservoir_by_tag['new_tag'].samples()[-1] == _Tensor(t_event.wall_time, t_event.step, - t_event.value) + t_event.value, 'filename') + + def test_add_tensor_event_out_of_order(self): + """Test add_tensor_event success for out_of_order summaries.""" + wall_time = 1 + value = '1' + tag = 'tag' + plugin_name = 'scalar' + file1 = 'file1' + ev_data = EventsData() + steps = [i for i in range(2, 10)] + for step in steps: + t_event = TensorEvent(wall_time=1, step=step, tag=tag, + plugin_name=plugin_name, value=value, filename=file1) + ev_data.add_tensor_event(t_event) + + t_event = TensorEvent(wall_time=1, step=1, tag=tag, + plugin_name=plugin_name, value=value, filename=file1) + ev_data.add_tensor_event(t_event) + + # Current steps should be: [1, 2, 3, 4, 5, 6, 7, 8, 9] + assert len(ev_data._reservoir_by_tag[tag].samples()) == len(steps) + 1 + + file2 = 'file2' + new_steps_1 = [5, 10] + for step in new_steps_1: + t_event = TensorEvent(wall_time=1, step=step, tag=tag, + plugin_name=plugin_name, value=value, filename=file2) + ev_data.add_tensor_event(t_event) + assert ev_data._reservoir_by_tag[tag].samples()[-1] == _Tensor(wall_time, step, value, file2) + + # Current steps should be: [1, 2, 3, 4, 5, 10] + steps = [1, 2, 3, 4, 5, 10] + samples = ev_data._reservoir_by_tag[tag].samples() + for step, sample in zip(steps, samples): + filename = file1 if sample.step < 5 else file2 + assert sample == _Tensor(wall_time, step, value, filename) + + new_steps_2 = [7, 11, 3] + for step in new_steps_2: + t_event = TensorEvent(wall_time=1, step=step, tag=tag, + plugin_name=plugin_name, value=value, filename=file2) + ev_data.add_tensor_event(t_event) + + # Current steps should be: [1, 2, 3, 5, 7, 10, 11], file2: [3, 5, 7, 10, 11] + steps = [1, 2, 3, 5, 7, 10, 11] + new_steps_2.extend(new_steps_1) + samples = ev_data._reservoir_by_tag[tag].samples() + for step, sample in zip(steps, samples): + filename = file2 if sample.step in new_steps_2 else file1 + assert sample == _Tensor(wall_time, step, value, filename) diff --git a/tests/ut/datavisual/data_transform/test_reservoir.py b/tests/ut/datavisual/data_transform/test_reservoir.py index e9b117a06ee9e25fe24581d900e8531818c43587..904ef697a04eed59b55ab87e2f0d6b86aa7215c4 100644 --- a/tests/ut/datavisual/data_transform/test_reservoir.py +++ b/tests/ut/datavisual/data_transform/test_reservoir.py @@ -27,10 +27,14 @@ class TestHistogramReservoir: sample1.value.count = 1 sample1.value.max = 102 sample1.value.min = 101 + sample1.step = 2 + sample1.filename = 'filename' sample2 = mock.MagicMock() sample2.value.count = 2 sample2.value.max = 102 sample2.value.min = 101 + sample2.step = 1 + sample2.filename = 'filename' my_reservoir.add_sample(sample1) my_reservoir.add_sample(sample2) samples = my_reservoir.samples() diff --git a/tests/ut/datavisual/processors/test_images_processor.py b/tests/ut/datavisual/processors/test_images_processor.py index dc122306b813108cef95fc2e15679860977a7521..36052d4954629e424d2e7750c43f7ffb2193a0d1 100644 --- a/tests/ut/datavisual/processors/test_images_processor.py +++ b/tests/ut/datavisual/processors/test_images_processor.py @@ -216,9 +216,8 @@ class TestImagesProcessor: """ Test removing sample in reservoir. - If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15], - and then [3, 5, 7, 9] will be deleted. - Results will be [1, 2, 3, 4, 15]. + If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15] in one summary, + Results will be [1, 2, 3, 4, 5, 7, 9, 15]. """ test_tag_name = self._complete_tag_name @@ -237,5 +236,4 @@ class TestImagesProcessor: except ImageNotExistError: not_found_step_list.append(test_step) - assert current_step_list == [1, 2, 3, 4, 15] - assert not_found_step_list == [5, 7, 9] + assert current_step_list == [1, 2, 3, 4, 5, 7, 9, 15]