# Copyright 2019 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ """Unittest for model_lineage.py""" import os import shutil import unittest from unittest import mock, TestCase from unittest.mock import MagicMock from mindinsight.lineagemgr.collection.model.model_lineage import TrainLineage, EvalLineage, \ AnalyzeObject from mindinsight.lineagemgr.common.exceptions.exceptions import \ LineageLogError, LineageGetModelFileError, MindInsightException from mindspore.common.tensor import Tensor from mindspore.dataset.engine import MindDataset, Dataset from mindspore.nn import Optimizer, WithLossCell, TrainOneStepWithLossScaleCell, \ SoftmaxCrossEntropyWithLogits from mindspore.train.callback import RunContext, ModelCheckpoint, SummaryStep from mindspore.train.summary import SummaryRecord @mock.patch('builtins.open') @mock.patch('os.makedirs') class TestModelLineage(TestCase): """Test TrainLineage and EvalLineage class in model_lineage.py.""" @classmethod def setUpClass(cls): cls.lineage_list = ['train_network', 'loss_fn', 'optimizer', 'train_dataset', 'valid_dataset', 'epoch', 'valid_step', 'hybrid_parallel', 'data_parallel_size', 'auto_parallel', 'device_number', 'batch_num', 'summary_log_path', 'model_ckpt'] cls.run_context = {key: None for key in cls.lineage_list} cls.run_context['net_outputs'] = Tensor() cls.my_run_context = RunContext cls.my_train_module = TrainLineage cls.my_eval_module = EvalLineage cls.my_analyze_module = AnalyzeObject cls.my_summary_record = SummaryRecord cls.summary_log_path = '/path/to/summary_log' @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_summary_record_exception(self, *args): """Test SummaryRecord with exception.""" args[0].return_value = None summary_record = self.my_summary_record(self.summary_log_path) with self.assertRaises(MindInsightException) as context: self.my_train_module(summary_record=summary_record, raise_exception=1) self.assertTrue(f'Invalid value for raise_exception.' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.ds') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'LineageSummary.record_dataset_graph') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'validate_summary_record') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.get_optimizer_by_network') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.analyze_optimizer') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_network') def test_begin(self, *args): """Test TrainLineage.begin method.""" args[1].return_value = None args[2].return_value = Optimizer(Tensor(0.1)) args[3].return_value = None args[5].serialize.return_value = {} run_context = {'optimizer': Optimizer(Tensor(0.1)), 'epoch_num': 10} train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path)) train_lineage.begin(self.my_run_context(run_context)) args[4].assert_called() @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.ds') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'LineageSummary.record_dataset_graph') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'validate_summary_record') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.get_optimizer_by_network') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.analyze_optimizer') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_network') def test_begin_error(self, *args): """Test TrainLineage.begin method.""" args[1].return_value = None args[2].return_value = Optimizer(Tensor(0.1)) args[3].return_value = None args[4].side_effect = Exception args[5].serialize.return_value = {} run_context = {'optimizer': Optimizer(Tensor(0.1)), 'epoch_num': 10} train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaisesRegex(LineageLogError, 'Dataset graph log error'): train_lineage.begin(self.my_run_context(run_context)) train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path)) train_lineage.begin(self.my_run_context(run_context)) args[4].assert_called() @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_begin_exception(self, *args): """Test TrainLineage.begin method with exception.""" args[0].return_value = None train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(Exception) as context: train_lineage.begin(self.run_context) self.assertTrue('Invalid TrainLineage run_context.' in str(context.exception)) run_context = {key: None for key in self.lineage_list} run_context['optimizer'] = 1 with self.assertRaises(Exception) as context: train_lineage.begin(self.my_run_context(run_context)) self.assertTrue('The parameter optimizer is invalid.' in str(context.exception)) @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_model_size') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_file_path') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.LineageSummary.record_train_lineage') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_dataset') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_optimizer') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_network') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_train_run_context') @mock.patch('builtins.float') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_train_end(self, *args): """Test TrainLineage.end method.""" args[1].return_value = 2.0 args[2].return_value = True args[3].return_value = True args[4].return_value = None args[5].return_value = None args[6].return_value = None args[7].return_value = (None, None) args[8].return_value = 10 train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) train_lineage.end(self.my_run_context(self.run_context)) args[6].assert_called() @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_train_end_exception(self, *args): """Test TrainLineage.end method when exception.""" args[0].return_value = True train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(Exception) as context: train_lineage.end(self.run_context) self.assertTrue('Invalid TrainLineage run_context.' in str(context.exception)) @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_model_size') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_file_path') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.LineageSummary.record_train_lineage') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_dataset') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_optimizer') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_network') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_train_run_context') @mock.patch('builtins.float') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_train_end_exception_log_error(self, *args): """Test TrainLineage.end method with logging errors.""" args[1].return_value = 2.0 args[2].return_value = True args[3].return_value = True args[4].return_value = None args[5].return_value = None args[6].side_effect = Exception args[7].return_value = (None, None) args[8].return_value = 10 train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(LineageLogError) as context: train_lineage.end(self.my_run_context(self.run_context)) self.assertTrue('End error in TrainLineage:' in str(context.exception)) @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_model_size') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_file_path') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.LineageSummary.record_train_lineage') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_dataset') @mock.patch( 'mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.analyze_optimizer') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_network') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_train_run_context') @mock.patch('builtins.float') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_train_end_exception_log_error2(self, *args): """Test TrainLineage.end method with logging errors.""" args[1].return_value = 2.0 args[2].return_value = True args[3].return_value = True args[4].return_value = None args[5].return_value = None args[6].side_effect = IOError args[7].return_value = (None, None) args[8].return_value = 10 run_context = {key: None for key in self.lineage_list} run_context['loss_fn'] = MagicMock() run_context['net_outputs'] = Tensor(0.11) train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(LineageLogError) as context: train_lineage.end(self.my_run_context(run_context)) self.assertTrue('End error in TrainLineage:' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_eval_exception_train_id_none(self, *args): """Test EvalLineage.end method with initialization error.""" args[0].return_value = True with self.assertRaises(MindInsightException) as context: self.my_eval_module(self.my_summary_record(self.summary_log_path), raise_exception=2) self.assertTrue('Invalid value for raise_exception.' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.analyze_dataset') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_eval_run_context') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'LineageSummary.record_evaluation_lineage') def test_eval_end(self, *args): """Test EvalLineage.end method.""" args[1].return_value = True args[2].return_value = True args[3].return_value = None args[0].return_value = None eval_lineage = self.my_eval_module(self.my_summary_record(self.summary_log_path)) eval_lineage.end(self.my_run_context(self.run_context)) args[0].assert_called() @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') def test_eval_end_except_run_context(self, *args): """Test EvalLineage.end method when run_context is invalid..""" args[0].return_value = True eval_lineage = self.my_eval_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(Exception) as context: eval_lineage.end(self.run_context) self.assertTrue('Invalid EvalLineage run_context.' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.analyze_dataset') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_eval_run_context') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'LineageSummary.record_evaluation_lineage') def test_eval_end_except_log_error(self, *args): """Test EvalLineage.end method with logging error.""" args[0].side_effect = Exception args[1].return_value = True args[2].return_value = True args[3].return_value = None eval_lineage = self.my_eval_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(LineageLogError) as context: eval_lineage.end(self.my_run_context(self.run_context)) self.assertTrue('End error in EvalLineage' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.analyze_dataset') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_summary_record') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.validate_eval_run_context') @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'LineageSummary.record_evaluation_lineage') def test_eval_end_except_log_error2(self, *args): """Test EvalLineage.end method with logging error.""" args[0].side_effect = IOError args[1].return_value = True args[2].return_value = True args[3].return_value = None eval_lineage = self.my_eval_module(self.my_summary_record(self.summary_log_path), True) with self.assertRaises(LineageLogError) as context: eval_lineage.end(self.my_run_context(self.run_context)) self.assertTrue('End error in EvalLineage' in str(context.exception)) def test_epoch_is_zero(self, *args): """Test TrainLineage.end method.""" args[0].return_value = None run_context = self.run_context run_context['epoch_num'] = 0 with self.assertRaises(MindInsightException): train_lineage = self.my_train_module(self.my_summary_record(self.summary_log_path), True) train_lineage.end(self.my_run_context(run_context)) def tearDown(self): """Teardown.""" if os.path.exists(self.summary_log_path): try: shutil.rmtree(self.summary_log_path) except IOError: pass class TestAnalyzer(TestCase): """Test Analyzer class in model_lineage.py.""" def setUp(self): """SetUp config.""" self.analyzer = AnalyzeObject() def test_analyze_optimizer(self): """Test analyze_optimizer method.""" optimizer = Optimizer(Tensor(0.12)) res = self.analyzer.analyze_optimizer(optimizer) assert res == 0.12 def test_get_dataset_path(self): """Test get_dataset_path method.""" dataset = MindDataset( dataset_file='/path/to/mindrecord' ) res = self.analyzer.get_dataset_path(dataset) assert res == '/path/to/mindrecord' def test_get_dataset_path_wrapped(self): """Test get_dataset_path_wrapped method.""" dataset = Dataset() dataset.input.append( MindDataset( dataset_size=10, dataset_file='/path/to/cifar10' )) res = self.analyzer.get_dataset_path_wrapped(dataset) assert res == '/path/to/cifar10' @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.' 'AnalyzeObject.get_dataset_path_wrapped') def test_analyze_dataset(self, mock_get_path): """Test analyze_dataset method.""" mock_get_path.return_value = '/path/to/mindinsightset' dataset = MindDataset( dataset_size=10, dataset_file='/path/to/mindinsightset' ) res1 = self.analyzer.analyze_dataset(dataset, {'step_num': 10, 'epoch': 2}, 'train') res2 = self.analyzer.analyze_dataset(dataset, {'step_num': 5}, 'valid') assert res1 == {'step_num': 10, 'train_dataset_path': '/path/to', 'train_dataset_size': 50, 'epoch': 2} assert res2 == {'step_num': 5, 'valid_dataset_path': '/path/to', 'valid_dataset_size': 50} def test_get_dataset_path_dataset(self): """Test get_dataset_path method with Dataset.""" dataset = Dataset( dataset_size=10, dataset_path='/path/to/cifar10' ) with self.assertRaises(IndexError): self.analyzer.get_dataset_path(output_dataset=dataset) def test_get_dataset_path_mindrecord(self): """Test get_dataset_path method with MindDataset.""" dataset = MindDataset( dataset_file='/path/to/cifar10' ) dataset_path = self.analyzer.get_dataset_path(output_dataset=dataset) self.assertEqual(dataset_path, '/path/to/cifar10') def test_get_file_path(self): """Test get_file_path method.""" model_ckpt = ModelCheckpoint(prefix='', directory='/path/to') summary_step = SummaryStep(MagicMock(full_file_name='/path/to/summary.log')) list_callback = [model_ckpt, summary_step] ckpt_file_path, _ = AnalyzeObject.get_file_path(list_callback) self.assertEqual(ckpt_file_path, '/path/to/test_model.ckpt') @mock.patch('os.path.getsize') def test_get_file_size(self, os_get_size_mock): """Test get_file_size method.""" os_get_size_mock.return_value = 128 file_size = AnalyzeObject.get_file_size('/file/path') self.assertEqual(file_size, 128) @mock.patch('os.path.getsize') def test_get_file_size_except(self, os_get_size_mock): """Test failed to get the size of file.""" os_get_size_mock.side_effect = OSError analyzer = AnalyzeObject with self.assertRaises(LineageGetModelFileError) as context: analyzer.get_file_size('/file/path') self.assertTrue('Error when get model file size:' in str(context.exception)) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_file_size') def test_get_model_size(self, get_file_size_mock): """Test get_model_size method.""" get_file_size_mock.return_value = 128 analyzer = AnalyzeObject file_size = analyzer.get_model_size(ckpt_file_path='/file/path') self.assertEqual(file_size, 128) @mock.patch('mindinsight.lineagemgr.collection.model.model_lineage.AnalyzeObject.get_file_size') def test_get_model_size_no_ckpt(self, get_file_size_mock): """Test get_model_size method without ckpt file.""" get_file_size_mock.return_value = 0 analyzer = AnalyzeObject file_size = analyzer.get_model_size(ckpt_file_path='') self.assertEqual(file_size, 0) @mock.patch('builtins.vars') def test_get_optimizer_by_network(self, mock_vars): """Test get_optimizer_by_network.""" mock_optimizer = Optimizer(Tensor(0.1)) mock_cells = MagicMock() mock_cells.items.return_value = [{'key': mock_optimizer}] mock_vars.return_value = { '_cells': { 'key': mock_optimizer } } res = AnalyzeObject.get_optimizer_by_network(MagicMock()) self.assertEqual(res, mock_optimizer) @mock.patch('builtins.vars') def test_get_loss_fn_by_network(self, mock_vars): """Test get_loss_fn_by_network.""" mock_cell1 = {'_cells': {'key': SoftmaxCrossEntropyWithLogits(0.2)}} mock_cell2 = {'_cells': {'opt': Optimizer(Tensor(0.1))}} mock_cell3 = {'_cells': {'loss': SoftmaxCrossEntropyWithLogits(0.1)}} mock_vars.side_effect = [mock_cell1, mock_cell2, mock_cell3] res = AnalyzeObject.get_loss_fn_by_network(MagicMock()) self.assertEqual(res, mock_cell3['_cells']['loss']) @mock.patch('builtins.vars') def test_get_backbone_network_with_loss_cell(self, mock_vars): """Test get_backbone_network with loss_cell.""" mock_cell = {'_cells': {'key': WithLossCell(MagicMock(), SoftmaxCrossEntropyWithLogits(0.1))} } mock_vars.return_value = mock_cell res = AnalyzeObject.get_backbone_network(MagicMock()) self.assertEqual(res, 'MagicMock') @mock.patch('builtins.vars') def test_get_backbone_network(self, mock_vars): """Test get_backbone_network.""" mock_net = TrainOneStepWithLossScaleCell() mock_net.network = MagicMock() mock_cell = { '_cells': { 'key': mock_net } } mock_vars.return_value = mock_cell res = AnalyzeObject.get_backbone_network(MagicMock()) self.assertEqual(res, 'MagicMock') if __name__ == '__main__': unittest.main(verbosity=2)