# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import tempfile import unittest import numpy as np import paddle from paddle.fluid.dygraph.dygraph_to_static.program_translator import ProgramCache from paddle.fluid.tests.unittests.ipu.op_test_ipu import IPUOpTest from paddle.jit import to_static from paddle.optimizer.lr import LRScheduler from functools import partial SEED = 2022 class SimpleLayer(paddle.nn.Layer): def __init__(self, loss_op=None, use_softmax=True, use_reduction=True, use_identity_loss=True): super(SimpleLayer, self).__init__() self.loss_op = loss_op self.conv = paddle.nn.Conv2D(in_channels=3, out_channels=1, kernel_size=2, stride=1) self.use_softmax = use_softmax self.use_reduction = use_reduction self.use_identity_loss = use_identity_loss @to_static() def forward(self, x, target=None): x = self.conv(x) x = paddle.fluid.layers.flatten(x, axis=1) if target is not None: if self.use_softmax: x = paddle.fluid.layers.softmax(x) if self.loss_op: loss = self.loss_op(x, target) else: loss = paddle.fluid.layers.cross_entropy(x, target) if self.use_reduction: loss = paddle.mean(loss) if self.use_identity_loss: loss = paddle.incubate.identity_loss(loss, 1) return x, loss return x @unittest.skipIf(not paddle.is_compiled_with_ipu(), "core is not compiled with IPU") class TestBase(IPUOpTest): def setUp(self): paddle.disable_static() self.set_op_attrs() self.set_data_feed() def set_op_attrs(self): self.loss_op = paddle.fluid.layers.cross_entropy def set_data_feed(self): self.data = paddle.uniform((32, 3, 10, 10), dtype='float32') self.label = paddle.randint(0, 10, shape=[32], dtype='int64') def create_model(self, use_ipu=False): return SimpleLayer(loss_op=self.loss_op, use_softmax=True, use_reduction=not use_ipu, use_identity_loss=use_ipu) def _test(self, use_ipu=False): paddle.seed(SEED) np.random.seed(SEED) model = self.create_model(use_ipu) optim = paddle.optimizer.Adam(learning_rate=0.01, parameters=model.parameters()) if use_ipu: device = paddle.set_device('ipu') ipu_strategy = paddle.static.IpuStrategy() ipu_strategy.set_graph_config(num_ipus=1, is_training=True, micro_batch_size=1, enable_manual_shard=False) ipu_strategy.set_optimizer(optim) result = [] for epoch in range(100): # ipu only needs call model() to do forward/backward/grad_update pred, loss = model(self.data, self.label) if not use_ipu: loss.backward() optim.step() optim.clear_grad() result.append(loss) if use_ipu: ipu_strategy.release_patch() return np.array(result) def test_training(self): ipu_loss = self._test(True).flatten() cpu_loss = self._test(False).flatten() self.assertTrue(np.allclose(ipu_loss, cpu_loss, atol=1e-4)) class TestSaveLoad(TestBase): @classmethod def setUpClass(cls): cls.save_path = tempfile.TemporaryDirectory() @classmethod def tearDownClass(cls): cls.save_path.cleanup() def _test(self, use_ipu=False): paddle.seed(SEED) np.random.seed(SEED) model = self.create_model(use_ipu) optim = paddle.optimizer.Adam(learning_rate=0.01, parameters=model.parameters()) model_path = '{}/model_state_dict_{}.pdparams'.format( self.save_path, 'ipu' if use_ipu else 'cpu') optim_path = '{}/optim_state_dict_{}.pdopt'.format( self.save_path, 'ipu' if use_ipu else 'cpu') if use_ipu: device = paddle.set_device('ipu') ipu_strategy = paddle.static.IpuStrategy() ipu_strategy.set_graph_config(num_ipus=1, is_training=True, micro_batch_size=1, enable_manual_shard=False) ipu_strategy.set_optimizer(optim) result = [] for epoch in range(100): # ipu only needs call model() to do forward/backward/grad_update pred, loss = model(self.data, self.label) if not use_ipu: loss.backward() optim.step() optim.clear_grad() result.append(loss) if use_ipu: paddle.fluid.core.IpuBackend.get_instance().weights_to_host() paddle.save(model.state_dict(), model_path) paddle.save(optim.state_dict(), optim_path) model.set_state_dict(paddle.load(model_path)) optim.set_state_dict(paddle.load(optim_path)) for epoch in range(100): # ipu only needs call model() to do forward/backward/grad_update pred, loss = model(self.data, self.label) if not use_ipu: loss.backward() optim.step() optim.clear_grad() result.append(loss) if use_ipu: ipu_strategy.release_patch() return np.array(result) @unittest.skipIf(not paddle.is_compiled_with_ipu(), "core is not compiled with IPU") class TestPatch(IPUOpTest): def setUp(cls): paddle.disable_static() def test(self, use_ipu=False): old_getter = ProgramCache.__getitem__ old_step = LRScheduler.step ipu_strategy = paddle.static.IpuStrategy() ipu_strategy.release_patch() reset_getter = ProgramCache.__getitem__ reset_step = LRScheduler.step self.assertTrue(reset_getter is old_getter) self.assertTrue(reset_step is old_step) class TestWithoutIdentityLoss1(TestBase): def create_model(self, use_ipu=False): return SimpleLayer(loss_op=self.loss_op, use_softmax=True, use_reduction=True, use_identity_loss=False) class TestWithoutIdentityLoss2(TestBase): def set_op_attrs(self): self.loss_op = paddle.fluid.layers.softmax_with_cross_entropy def set_data_feed(self): self.data = paddle.uniform((32, 3, 10, 10), dtype='float32') self.label = paddle.randint(0, 10, shape=[32, 1], dtype='int64') def create_model(self, use_ipu=False): return SimpleLayer(loss_op=self.loss_op, use_softmax=False, use_reduction=True, use_identity_loss=False) class TestWithoutIdentityLoss3(TestBase): def set_op_attrs(self): self.loss_op = partial(paddle.fluid.layers.kldiv_loss, reduction="none") def set_data_feed(self): self.data = paddle.uniform((32, 3, 10, 10), dtype='float32') self.label = paddle.rand(shape=[32, 81], dtype='float32') def create_model(self, use_ipu=False): return SimpleLayer(loss_op=self.loss_op, use_softmax=True, use_reduction=True, use_identity_loss=False) if __name__ == "__main__": unittest.main()