# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import unittest import numpy as np from simple_nets import batchnorm_fc_with_inputs, simple_fc_net_with_inputs import paddle import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.framework as framework import paddle.fluid.layers as layers from paddle.fluid.backward import append_backward from paddle.fluid.framework import Program, program_guard np.random.seed(123) class TestCondInputOutput(unittest.TestCase): def test_return_single_var(self): """ pseudocode: if 0.23 < 0.1: return 2 else: return -1 """ paddle.enable_static() def true_func(): return layers.fill_constant(shape=[2, 3], dtype='int32', value=2) def false_func(): return layers.fill_constant(shape=[3, 2], dtype='int32', value=-1) main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): x = layers.fill_constant(shape=[1], dtype='float32', value=0.1) y = layers.fill_constant(shape=[1], dtype='float32', value=0.23) pred = paddle.less_than(y, x) out = layers.cond(pred, true_func, false_func) # out is one tensor place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) (ret,) = exe.run(main_program, fetch_list=[out.name]) np.testing.assert_allclose( np.asarray(ret), np.full((3, 2), -1, np.int32), rtol=1e-05 ) def test_return_var_tuple(self): """ pseudocode: if True: return 1, True else: return 3, 2 """ paddle.enable_static() def true_func(): return layers.fill_constant( shape=[1, 2], dtype='int32', value=1 ), layers.fill_constant(shape=[2, 3], dtype='bool', value=True) def false_func(): return layers.fill_constant( shape=[3, 4], dtype='float32', value=3 ), layers.fill_constant(shape=[4, 5], dtype='int64', value=2) main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): pred = layers.fill_constant(shape=[1], dtype='bool', value=True) out = layers.cond(pred, true_func, false_func) # out is a tuple containing 2 tensors place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) ret = exe.run(main_program, fetch_list=out) np.testing.assert_allclose( np.asarray(ret[0]), np.full((1, 2), 1, np.int32), rtol=1e-05 ) np.testing.assert_allclose( np.asarray(ret[1]), np.full((2, 3), True, bool), rtol=1e-05 ) def test_pass_and_modify_var(self): """ pseudocode: for i in range(5): a = 7 if i % 2 == 0: a = a * (i + 1) else: a = a - (i - 1) """ paddle.enable_static() def true_func(a, i): a = a * (i + 1) return a def false_func(a, i): a = a - (i - 1) return a main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): a = layers.fill_constant(shape=[3, 2, 1], dtype='int32', value=7) i = fluid.data(name="i", shape=[1], dtype='int32') pred = (i % 2) == 0 a = layers.cond( pred, lambda: true_func(a, i), lambda: false_func(a, i) ) place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) for feed_i in range(5): expected_a = 7 * (feed_i + 1) if feed_i % 2 == 0 else 8 - feed_i (ret,) = exe.run( main_program, feed={'i': np.full((1), feed_i, np.int32)}, fetch_list=[a], ) np.testing.assert_allclose( np.asarray(ret), np.full((3, 2, 1), expected_a, np.int32), rtol=1e-05, ) def test_return_none(self): """ pseudocode: test doing nothing in branches for i in range(5): if i % 2 == 0: pass else: pass """ paddle.enable_static() def true_func(): pass def false_func(): return None main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): i = fluid.data(name="i", shape=[1], dtype='int32') pred = (i % 2) == 0 out1 = layers.cond(pred, true_func, false_func) out2 = layers.cond(pred, None, false_func) out3 = layers.cond(pred, true_func, None) place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) for feed_i in range(5): # Test that output is None is runnable exe.run(main_program, feed={'i': np.full((1), feed_i, np.int32)}) self.assertIsNone(out1) self.assertIsNone(out2) self.assertIsNone(out3) def test_wrong_structure_exception(self): """ test returning different number of tensors cannot merge into output """ paddle.enable_static() def func_return_none(): return None def func_return_one_tensor(): return layers.fill_constant(shape=[2, 7], dtype='int32', value=3) def func_return_two_tensors(): return layers.fill_constant( shape=[3, 1], dtype='int32', value=7 ), layers.fill_constant(shape=[3, 1], dtype='int32', value=8) main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): i = fluid.data(name="i", shape=[1], dtype='int32') pred = (i % 2) == 0 with self.assertRaises(TypeError): out = layers.cond(pred, i, func_return_one_tensor) with self.assertRaises(TypeError): out = layers.cond(pred, func_return_one_tensor, np.asarray([3])) with self.assertRaises(Exception) as e: out = layers.cond( pred, func_return_none, func_return_one_tensor ) self.assertTrue( "Incompatible return values of true_fn and false_fn in cond" in str(e.exception) ) with self.assertRaises(Exception) as e: out = layers.cond( pred, func_return_two_tensors, func_return_none ) self.assertTrue( "Incompatible return values of true_fn and false_fn in cond" in str(e.exception) ) with self.assertRaises(Exception) as e: out = layers.cond( pred, func_return_one_tensor, func_return_two_tensors ) self.assertTrue( "true fn returns 1 vars, but false fn returns 2 vars, which is not equals" in str(e.exception) ) def test_extremely_simple_net_with_op_in_condition(self): paddle.enable_static() main_program = fluid.Program() startup_program = fluid.Program() with fluid.program_guard(main_program, startup_program): a = fluid.layers.fill_constant( shape=[1], dtype='float32', value=1.23 ) a.stop_gradient = False b = fluid.layers.fill_constant( shape=[1], dtype='float32', value=1.25 ) b.stop_gradient = False out = layers.cond(a - b < -1.0, lambda: a, lambda: b) append_backward(out) place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) ret = exe.run( main_program, fetch_list=[out, b, a.grad_name, b.grad_name] ) # Note: fill_constant has loss of precision, you have to assertEqual # with values doens't lose precision in float-point number. self.assertEqual(ret[0][0], ret[1][0]) self.assertEqual(ret[2][0], 0.0) self.assertEqual(ret[3][0], 1.0) class TestCondNestedControlFlow(unittest.TestCase): def test_cond_inside_cond(self): """ pseudocode: for i in range(1, 10): a = 2 * i if i < 5: if i >= 3: return a + a else: return a - a else: if i < 8: return a * a else: return a / a """ paddle.enable_static() def less_than_branch(i, a): return layers.cond( i >= 3.0, lambda: paddle.add(a, a), lambda: paddle.subtract(a, a), ) def greater_equal_branch(i, a): return layers.cond( i < 8.0, lambda: paddle.multiply(a, a), lambda: paddle.divide(a, a), ) main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): i = fluid.data(name="i", shape=[1], dtype='float32') a = 2.0 * i out = layers.cond( i < 5.0, lambda: less_than_branch(i, a), lambda: greater_equal_branch(i, a), ) mean = paddle.mean(out) append_backward(mean) place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) for feed_i in range(0, 10): expected_a = 2.0 * feed_i if feed_i < 5: expected_ret = expected_a + expected_a if feed_i >= 3 else 0.0 expected_a_grad = 2.0 if feed_i >= 3 else 0.0 else: expected_ret = expected_a * expected_a if feed_i < 8 else 1.0 expected_a_grad = 2.0 * expected_a if feed_i < 8 else 0.0 ret = exe.run( main_program, feed={'i': np.full((1), feed_i, np.float32)}, fetch_list=[out.name, a.grad_name], ) self.assertEqual(ret[0][0], expected_ret) self.assertEqual(ret[1][0], expected_a_grad) def test_cond_op_in_condition(self): paddle.enable_static() main_program = fluid.Program() startup_program = fluid.Program() with fluid.program_guard(main_program, startup_program): a = fluid.layers.fill_constant( shape=[1], dtype='float32', value=1.23 ) a.stop_gradient = False b = fluid.layers.fill_constant( shape=[1], dtype='float32', value=1.24 ) b.stop_gradient = False out = fluid.layers.cond( a < b, lambda: fluid.layers.cond( a - b < -1.0, lambda: paddle.add(a, b), lambda: paddle.multiply(a, b), ), lambda: fluid.layers.cond( a == b, lambda: paddle.subtract(a, b), lambda: paddle.pow(a, b), ), ) append_backward(out) place = ( fluid.CUDAPlace(0) if core.is_compiled_with_cuda() else fluid.CPUPlace() ) exe = fluid.Executor(place) ret = exe.run(main_program, fetch_list=[out, a.grad_name, b.grad_name]) # Note: fill_constant has loss of precision, so we assertAlmostEqual. self.assertAlmostEqual(ret[0][0], 1.5252) self.assertAlmostEqual(ret[1][0], 1.24) self.assertAlmostEqual(ret[2][0], 1.23) class TestCondBackward(unittest.TestCase): def backward_value_helper(self, cond_func, use_cuda, use_parallel_exe): """ Helper function that compares calculated backward value is close to dy/dx """ paddle.enable_static() main_program = Program() main_program.random_seed = 123 startup_program = Program() startup_program.random_seed = 123 with program_guard(main_program, startup_program): img = fluid.data(name='image', shape=[-1, 9], dtype='float32') img.stop_gradient = False label = fluid.data(name='label', shape=[-1, 1], dtype='int64') i = fluid.data(name="i", shape=[1], dtype='int32') loss = cond_func(i, img, label) append_backward(loss) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_program) num_devices = 1 if use_parallel_exe: os.environ['CPU_NUM'] = str(2) exe = fluid.ParallelExecutor( use_cuda=use_cuda, main_program=main_program, loss_name=loss.name, ) num_devices = exe.device_count delta = 0.005 for feed_i in range(0, 10): feed_img = np.random.random(size=[1, 9]).astype(np.float32) feed_label = np.random.randint( low=0, high=10, size=[1, 1], dtype=np.int64 ) if use_parallel_exe: img_grad, loss_value = exe.run( feed={ 'i': np.full((num_devices), feed_i, np.int32), 'image': np.repeat(feed_img, num_devices, axis=0), 'label': np.repeat(feed_label, num_devices, axis=0), }, fetch_list=[img.grad_name, loss.name], ) else: img_grad, loss_value = exe.run( main_program, feed={ 'i': np.full((1), feed_i, np.int32), 'image': feed_img, 'label': feed_label, }, fetch_list=[img.grad_name, loss.name], ) numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32) feed_img_delta = np.copy(feed_img) for j in range(9): feed_img_delta[0][j] = feed_img[0][j] + delta if use_parallel_exe: loss_delta = exe.run( feed={ 'i': np.full((num_devices), feed_i, np.int32), 'image': np.repeat( feed_img_delta, num_devices, axis=0 ), 'label': np.repeat(feed_label, num_devices, axis=0), }, fetch_list=[loss.name], ) multi_device_grad = ( (loss_delta[0] - loss_value[0]) / delta / num_devices ) for d in range(num_devices): numerical_grad[d][j] = multi_device_grad[d] else: loss_delta = exe.run( main_program, feed={ 'i': np.full((1), feed_i, np.int32), 'image': feed_img_delta, 'label': feed_label, }, fetch_list=[loss.name], ) numerical_grad[0][j] = ( loss_delta[0] - loss_value[0] ) / delta feed_img_delta[0][j] = feed_img[0][j] np.testing.assert_allclose( img_grad, numerical_grad, rtol=0.05, atol=0.05 ) def add_optimizer_helper(self, cond_func, use_cuda, use_parallel_exe): """ Test that program is runnable when add optimizer """ main_program = Program() startup_program = Program() with program_guard(main_program, startup_program): img = fluid.data(name='image', shape=[-1, 784], dtype='float32') label = fluid.data(name='label', shape=[-1, 1], dtype='int64') i = fluid.data(name="i", shape=[1], dtype='int32') loss = cond_func(i, img, label) optimizer = fluid.optimizer.SGD(learning_rate=0.1) optimizer.minimize(loss) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_program) if use_parallel_exe: os.environ['CPU_NUM'] = str(2) exe = fluid.ParallelExecutor( use_cuda=use_cuda, main_program=main_program, loss_name=loss.name, ) num_devices = exe.device_count for feed_i in range(0, 10): feed_img = np.random.random(size=[16, 784]).astype(np.float32) feed_label = np.random.randint( low=0, high=10, size=[16, 1], dtype=np.int64 ) if use_parallel_exe: exe.run( feed={ 'i': np.full((num_devices), feed_i, np.int32), 'image': np.repeat(feed_img, num_devices, axis=0), 'label': np.repeat(feed_label, num_devices, axis=0), }, fetch_list=[loss.name], ) else: exe.run( main_program, feed={ 'i': np.full((1), feed_i, np.int32), 'image': feed_img, 'label': feed_label, }, fetch_list=[loss], ) def test_cond_backward(self): paddle.enable_static() def cond_func(i, img, label): predicate = (i % 2) == 0 return layers.cond( predicate, lambda: simple_fc_net_with_inputs(img, label, class_num=10), lambda: batchnorm_fc_with_inputs(img, label, class_num=10), ) for use_parallel_exe in [False, True]: if use_parallel_exe and os.name == "nt": print( "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" ) continue self.backward_value_helper( cond_func, core.is_compiled_with_cuda(), use_parallel_exe ) self.add_optimizer_helper( cond_func, core.is_compiled_with_cuda(), use_parallel_exe ) def test_half_nested_cond_backward(self): paddle.enable_static() def branch(i, img, label): return layers.cond( (i % 2) == 0, lambda: simple_fc_net_with_inputs(img, label, class_num=10), lambda: batchnorm_fc_with_inputs(img, label, class_num=10), ) def cond_func_simple_net_at_true(i, img, label): return layers.cond( i < 5, lambda: branch(i, img, label), lambda: paddle.mean(img) ) def cond_func_simple_net_at_false(i, img, label): return layers.cond( i < 5, lambda: paddle.mean(img), lambda: branch(i, img, label) ) for use_parallel_exe in [False, True]: if use_parallel_exe and os.name == "nt": print( "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" ) continue self.backward_value_helper( cond_func_simple_net_at_true, core.is_compiled_with_cuda(), use_parallel_exe, ) self.add_optimizer_helper( cond_func_simple_net_at_true, core.is_compiled_with_cuda(), use_parallel_exe, ) self.backward_value_helper( cond_func_simple_net_at_false, core.is_compiled_with_cuda(), use_parallel_exe, ) self.add_optimizer_helper( cond_func_simple_net_at_false, core.is_compiled_with_cuda(), use_parallel_exe, ) def test_nested_cond_backward(self): paddle.enable_static() def branch(i, img, label, mod_two): if mod_two: predicate = (i % 2) == 0 else: predicate = (i % 2) != 0 return layers.cond( predicate, lambda: simple_fc_net_with_inputs(img, label, class_num=10), lambda: batchnorm_fc_with_inputs(img, label, class_num=10), ) def cond_func(i, img, label): return layers.cond( i < 5, lambda: branch(i, img, label, True), lambda: branch(i, img, label, False), ) for use_parallel_exe in [False, True]: if use_parallel_exe and os.name == "nt": print( "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" ) continue self.backward_value_helper( cond_func, core.is_compiled_with_cuda(), use_parallel_exe ) self.add_optimizer_helper( cond_func, core.is_compiled_with_cuda(), use_parallel_exe ) class TestCondWithError(unittest.TestCase): def test_input_type_error(self): paddle.enable_static() main_program = framework.Program() startup_program = framework.Program() with framework.program_guard(main_program, startup_program): pred = fluid.data(name='y', shape=[1], dtype='bool') def func(): return pred with self.assertRaises(TypeError): layers.cond(None, func, func) with self.assertRaises(TypeError): layers.cond(pred, func, set()) with self.assertRaises(TypeError): layers.cond(pred, set(), func) with self.assertRaises(TypeError): layers.cond(pred, func, func, set()) class TestCondWithDict(unittest.TestCase): def test_input_with_dict(self): paddle.enable_static() main_program = framework.Program() startup_program = framework.Program() with framework.program_guard(main_program, startup_program): def true_func(): return { '1': paddle.full(shape=[3, 2], dtype='int32', fill_value=1), '2': paddle.full( shape=[2, 3], dtype='bool', fill_value=True ), } def false_func(): return { '1': paddle.full( shape=[3, 4], dtype='float32', fill_value=3 ), '2': paddle.full(shape=[4, 5], dtype='int64', fill_value=2), } x = paddle.full(shape=[1], dtype='float32', fill_value=0.1) y = paddle.full(shape=[1], dtype='float32', fill_value=0.23) pred = paddle.less_than(x=x, y=y, name=None) ret = paddle.static.nn.cond(pred, true_func, false_func) self.assertEqual( ret['1'].shape, (3, -1), f"The shape is not correct, expects (3, -1) but gets {ret['1'].shape}.", ) self.assertEqual( ret['2'].shape, (-1, -1), f"The shape is not correct, expects (-1, -1) but gets {ret['2'].shape}.", ) if __name__ == '__main__': unittest.main()