# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import paddle.fluid as fluid from paddle.fluid.wrapped_decorator import wrap_decorator import unittest from unittest import TestCase import numpy as np from paddle.fluid.dygraph.base import grad def _dygraph_guard_(func): def __impl__(*args, **kwargs): if fluid.in_dygraph_mode(): return func(*args, **kwargs) else: with fluid.dygraph.guard(): return func(*args, **kwargs) return __impl__ dygraph_guard = wrap_decorator(_dygraph_guard_) def random_var(size, low=-1, high=1, dtype='float32'): x_np = np.random.uniform(low=low, high=high, size=size).astype(dtype) return fluid.dygraph.to_variable(x_np) class TestDygraphDoubleGrad(TestCase): def setUp(self): self.sort_sum_gradient = False self.shape = [5, 10] def grad(self, outputs, inputs, grad_outputs=None, no_grad_set=None, create_graph=False): backward_strategy = fluid.dygraph.BackwardStrategy() backward_strategy.sort_sum_gradient = self.sort_sum_gradient return grad( outputs=outputs, inputs=inputs, grad_outputs=grad_outputs, no_grad_set=no_grad_set, create_graph=create_graph, backward_strategy=backward_strategy) @dygraph_guard def test_exception(self): with self.assertRaises(AssertionError): self.grad(None, None) shape = self.shape with self.assertRaises(AssertionError): self.grad(1, random_var(shape)) with self.assertRaises(AssertionError): self.grad(random_var(shape), 1) with self.assertRaises(AssertionError): self.grad([1], [random_var(shape)]) with self.assertRaises(AssertionError): self.grad([random_var(shape)], [1]) with self.assertRaises(AssertionError): self.grad([random_var(shape), random_var(shape)], [random_var(shape)], [random_var(shape)]) with self.assertRaises(AssertionError): self.grad([random_var(shape)], [random_var(shape)], no_grad_set=[1]) with self.assertRaises(AssertionError): self.grad([random_var(shape)], [random_var(shape)], no_grad_set=1) @dygraph_guard def test_simple_example(self): x = random_var(self.shape) x.stop_gradient = False y = x + 1 for create_graph in [False, True]: dx, = self.grad([x], [x], create_graph=create_graph) self.assertEqual(dx.shape, x.shape) self.assertTrue(np.all(dx.numpy() == 1)) self.assertNotEqual(dx.stop_gradient, create_graph) dx_mul_2, = self.grad([y, x], [x], create_graph=create_graph) self.assertEqual(dx_mul_2.shape, x.shape) self.assertTrue(np.all(dx_mul_2.numpy() == 2)) self.assertNotEqual(dx_mul_2.stop_gradient, create_graph) none_grad, = self.grad([x], [y], create_graph=create_graph) self.assertTrue(none_grad is None) grad_with_none_and_not_none, = self.grad( [x, y], [y], create_graph=create_graph) self.assertTrue(grad_with_none_and_not_none.shape, x.shape) self.assertTrue(np.all(grad_with_none_and_not_none.numpy() == 1)) self.assertNotEqual(grad_with_none_and_not_none.stop_gradient, create_graph) @dygraph_guard def test_none_one_initial_gradient(self): x = random_var(self.shape) x.stop_gradient = False y = fluid.layers.relu(x) y = y * y z = y * y x_np = x.numpy() relu_x_np = np.maximum(x_np, 0).astype('float32') relu_x_grad_np = (x_np > 0).astype('float32') dy_expected = (relu_x_np * relu_x_grad_np * 2).astype('float32') dz_expected = (np.power(relu_x_np, 3) * relu_x_grad_np * 4).astype('float32') random_grad_y = random_var(y.shape) random_grad_z = random_var(z.shape) ones_grad_y = np.ones(y.shape).astype('float32') ones_grad_z = np.ones(z.shape).astype('float32') original_random_grad_y = random_grad_y.numpy() original_random_grad_z = random_grad_z.numpy() for grad_y in [random_grad_y]: for grad_z in [random_grad_z]: for create_graph in [False, True]: dx_actual, = self.grad( outputs=[y, z], inputs=[x], grad_outputs=[grad_y, grad_z], create_graph=create_graph) grad_y_np = ones_grad_y if grad_y is None else grad_y.numpy( ) grad_z_np = ones_grad_z if grad_z is None else grad_z.numpy( ) dx_expected = dy_expected * grad_y_np + dz_expected * grad_z_np self.assertTrue(np.allclose(dx_actual.numpy(), dx_expected)) if grad_y is not None: self.assertTrue(grad_y.stop_gradient) self.assertTrue( np.array_equal(grad_y.numpy(), original_random_grad_y)) if grad_z is not None: self.assertTrue(grad_z.stop_gradient) self.assertTrue( np.array_equal(grad_z.numpy(), original_random_grad_z)) @dygraph_guard def test_example_with_gradient_accumulation_and_create_graph(self): x = random_var(self.shape) x_np = x.numpy() numel = x_np.size x.stop_gradient = False y = fluid.layers.relu(x) z = y + 1 w = z * z w_mean = fluid.layers.reduce_mean(w) del y, z, w dx_actual, = self.grad([w_mean], [x], create_graph=True) del w_mean self.assertFalse(dx_actual.stop_gradient) # Theoritical result based on math calculation dx_expected = (1.0 / float(numel) * (np.maximum(x_np, 0) + 1) * (x_np > 0) * 2).astype('float32') self.assertTrue(np.allclose(dx_actual.numpy(), dx_expected)) loss = fluid.layers.reduce_mean(dx_actual * dx_actual + x * x) loss.backward() x_grad_actual = x.gradient() x_grad_expected = (2.0 / float(numel) * (x_np + dx_expected * (x_np > 0) * 2 / float(numel))).astype('float32') self.assertTrue(np.allclose(x_grad_actual, x_grad_expected)) @dygraph_guard def test_example_with_gradient_accumulation_and_no_grad_set(self): x = random_var(self.shape) x_np = x.numpy() numel = x_np.size x.stop_gradient = False y1 = fluid.layers.relu(x) y2 = fluid.layers.relu(x) z = y1 + y2 w = z * z w_mean = fluid.layers.reduce_mean(w) del y1, z, w dx_actual, = self.grad( [w_mean], [x], create_graph=True, no_grad_set=[y2]) self.assertFalse(y2.stop_gradient) self.assertFalse(dx_actual.stop_gradient) dx_expected = (1.0 / float(numel) * (np.maximum(x_np, 0) + y2.numpy()) * (x_np > 0) * 2).astype('float32') self.assertTrue(np.allclose(dx_actual.numpy(), dx_expected)) loss = fluid.layers.reduce_mean(dx_actual * dx_actual + x * x) loss.backward() x_grad_actual = x.gradient() x_grad_expected = (2.0 / float(numel) * (x_np + dx_expected * (x_np > 0) * 4 / float(numel))).astype('float32') self.assertTrue(np.allclose(x_grad_actual, x_grad_expected)) @dygraph_guard def test_example_with_gradient_accumulation_and_not_create_graph(self): x = random_var(self.shape) x_np = x.numpy() numel = x_np.size x.stop_gradient = False y = fluid.layers.relu(x) z = y + 1 w = z * z w_mean = fluid.layers.reduce_mean(w) del y, z, w dx_actual, = self.grad([w_mean], [x], create_graph=False) del w_mean self.assertTrue(dx_actual.stop_gradient) dx_expected = (1.0 / float(numel) * (np.maximum(x_np, 0) + 1) * (x_np > 0) * 2).astype('float32') self.assertTrue(np.allclose(dx_actual.numpy(), dx_expected)) loss = fluid.layers.reduce_mean(dx_actual * dx_actual + x * x) loss.backward() x_grad_actual = x.gradient() x_grad_expected = (2.0 * x_np / float(numel)).astype('float32') self.assertTrue(np.allclose(x_grad_actual, x_grad_expected)) class TestDygraphDoubleGradSortGradient(TestDygraphDoubleGrad): def setUp(self): self.sort_sum_gradient = True self.shape = [5, 10] if __name__ == '__main__': unittest.main()