diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index bff15df5f9fcc0c289bf9dcfab2daf1ce029fac2..fd773fabd08fa79e79c552aececf99215cbf84a1 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -531,16 +531,6 @@ class CompiledProgram: else: self._places = [self._place] - # Todo(liym27):If optimizer is used in control flow, - # training on multi-places is not supported now, will - # be supported later. - if len(self._places) > 1 and _has_optimizer_in_control_flow( - self._program - ): - raise NotImplementedError( - "If optimizer is used in control flow, " - "training on multi-places is not supported now." - ) if isinstance(self._place, core.CUDAPlace): use_device = DeviceType.CUDA elif isinstance(self._place, core.XPUPlace): diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 269172ea994e22ff6a6d34da7d97bfd22f113311..b5540f4f9280b6a607ba2733d04b16648552c9b0 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -189,21 +189,14 @@ class ParallelExecutor: else framework.default_main_program() ) - self._compiled_program = compiler.CompiledProgram(main_program) + self._compiled_program = compiler.CompiledProgram( + main_program, build_strategy=build_strategy + ) if share_vars_from: assert isinstance( share_vars_from, ParallelExecutor ), "The share_vars_from should be ParallelExecutor." - self._compiled_program.with_data_parallel( - loss_name=loss_name, - build_strategy=build_strategy, - exec_strategy=exec_strategy, - share_vars_from=share_vars_from._compiled_program - if share_vars_from - else None, - ) - self._place = core.CUDAPlace(0) if use_cuda else core.CPUPlace() self._exe = executor.Executor(self._place) diff --git a/python/paddle/fluid/tests/unittests/test_cond.py b/python/paddle/fluid/tests/unittests/test_cond.py index 33913085307c7b232876719cb20f479dc2a19b97..d6a2b54e90d83d011784b9ce7719fae0e30edd51 100644 --- a/python/paddle/fluid/tests/unittests/test_cond.py +++ b/python/paddle/fluid/tests/unittests/test_cond.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import unittest import numpy as np @@ -605,7 +604,7 @@ class TestCondNestedControlFlow(unittest.TestCase): class TestCondBackward(unittest.TestCase): - def backward_value_helper(self, cond_func, use_cuda, use_parallel_exe): + def backward_value_helper(self, cond_func, use_cuda): """ Helper function that compares calculated backward value is close to dy/dx """ @@ -626,14 +625,6 @@ class TestCondBackward(unittest.TestCase): exe.run(startup_program) num_devices = 1 - if use_parallel_exe: - os.environ['CPU_NUM'] = str(2) - exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - main_program=main_program, - loss_name=loss.name, - ) - num_devices = exe.device_count delta = 0.005 for feed_i in range(0, 10): @@ -641,65 +632,37 @@ class TestCondBackward(unittest.TestCase): feed_label = np.random.randint( low=0, high=10, size=[1, 1], dtype=np.int64 ) - if use_parallel_exe: - img_grad, loss_value = exe.run( - feed={ - 'i': np.full((num_devices), feed_i, np.int32), - 'image': np.repeat(feed_img, num_devices, axis=0), - 'label': np.repeat(feed_label, num_devices, axis=0), - }, - fetch_list=[img.grad_name, loss.name], - ) - else: - img_grad, loss_value = exe.run( + + img_grad, loss_value = exe.run( + main_program, + feed={ + 'i': np.full((1), feed_i, np.int32), + 'image': feed_img, + 'label': feed_label, + }, + fetch_list=[img.grad_name, loss.name], + ) + + numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32) + feed_img_delta = np.copy(feed_img) + for j in range(9): + feed_img_delta[0][j] = feed_img[0][j] + delta + loss_delta = exe.run( main_program, feed={ 'i': np.full((1), feed_i, np.int32), - 'image': feed_img, + 'image': feed_img_delta, 'label': feed_label, }, - fetch_list=[img.grad_name, loss.name], + fetch_list=[loss.name], ) - - numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32) - feed_img_delta = np.copy(feed_img) - for j in range(9): - feed_img_delta[0][j] = feed_img[0][j] + delta - if use_parallel_exe: - loss_delta = exe.run( - feed={ - 'i': np.full((num_devices), feed_i, np.int32), - 'image': np.repeat( - feed_img_delta, num_devices, axis=0 - ), - 'label': np.repeat(feed_label, num_devices, axis=0), - }, - fetch_list=[loss.name], - ) - multi_device_grad = ( - (loss_delta[0] - loss_value[0]) / delta / num_devices - ) - for d in range(num_devices): - numerical_grad[d][j] = multi_device_grad[d] - else: - loss_delta = exe.run( - main_program, - feed={ - 'i': np.full((1), feed_i, np.int32), - 'image': feed_img_delta, - 'label': feed_label, - }, - fetch_list=[loss.name], - ) - numerical_grad[0][j] = ( - loss_delta[0] - loss_value[0] - ) / delta + numerical_grad[0][j] = (loss_delta[0] - loss_value[0]) / delta feed_img_delta[0][j] = feed_img[0][j] np.testing.assert_allclose( img_grad, numerical_grad, rtol=0.05, atol=0.05 ) - def add_optimizer_helper(self, cond_func, use_cuda, use_parallel_exe): + def add_optimizer_helper(self, cond_func, use_cuda): """ Test that program is runnable when add optimizer """ @@ -716,39 +679,21 @@ class TestCondBackward(unittest.TestCase): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_program) - if use_parallel_exe: - os.environ['CPU_NUM'] = str(2) - exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - main_program=main_program, - loss_name=loss.name, - ) - num_devices = exe.device_count for feed_i in range(0, 10): feed_img = np.random.random(size=[16, 784]).astype(np.float32) feed_label = np.random.randint( low=0, high=10, size=[16, 1], dtype=np.int64 ) - if use_parallel_exe: - exe.run( - feed={ - 'i': np.full((num_devices), feed_i, np.int32), - 'image': np.repeat(feed_img, num_devices, axis=0), - 'label': np.repeat(feed_label, num_devices, axis=0), - }, - fetch_list=[loss.name], - ) - else: - exe.run( - main_program, - feed={ - 'i': np.full((1), feed_i, np.int32), - 'image': feed_img, - 'label': feed_label, - }, - fetch_list=[loss], - ) + exe.run( + main_program, + feed={ + 'i': np.full((1), feed_i, np.int32), + 'image': feed_img, + 'label': feed_label, + }, + fetch_list=[loss], + ) def test_cond_backward(self): @@ -762,19 +707,8 @@ class TestCondBackward(unittest.TestCase): lambda: batchnorm_fc_with_inputs(img, label, class_num=10), ) - for use_parallel_exe in [False, True]: - if use_parallel_exe and os.name == "nt": - print( - "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" - ) - continue - - self.backward_value_helper( - cond_func, core.is_compiled_with_cuda(), use_parallel_exe - ) - self.add_optimizer_helper( - cond_func, core.is_compiled_with_cuda(), use_parallel_exe - ) + self.backward_value_helper(cond_func, core.is_compiled_with_cuda()) + self.add_optimizer_helper(cond_func, core.is_compiled_with_cuda()) def test_half_nested_cond_backward(self): paddle.enable_static() @@ -796,33 +730,22 @@ class TestCondBackward(unittest.TestCase): i < 5, lambda: paddle.mean(img), lambda: branch(i, img, label) ) - for use_parallel_exe in [False, True]: - if use_parallel_exe and os.name == "nt": - print( - "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" - ) - continue - - self.backward_value_helper( - cond_func_simple_net_at_true, - core.is_compiled_with_cuda(), - use_parallel_exe, - ) - self.add_optimizer_helper( - cond_func_simple_net_at_true, - core.is_compiled_with_cuda(), - use_parallel_exe, - ) - self.backward_value_helper( - cond_func_simple_net_at_false, - core.is_compiled_with_cuda(), - use_parallel_exe, - ) - self.add_optimizer_helper( - cond_func_simple_net_at_false, - core.is_compiled_with_cuda(), - use_parallel_exe, - ) + self.backward_value_helper( + cond_func_simple_net_at_true, + core.is_compiled_with_cuda(), + ) + self.add_optimizer_helper( + cond_func_simple_net_at_true, + core.is_compiled_with_cuda(), + ) + self.backward_value_helper( + cond_func_simple_net_at_false, + core.is_compiled_with_cuda(), + ) + self.add_optimizer_helper( + cond_func_simple_net_at_false, + core.is_compiled_with_cuda(), + ) def test_nested_cond_backward(self): paddle.enable_static() @@ -845,18 +768,8 @@ class TestCondBackward(unittest.TestCase): lambda: branch(i, img, label, False), ) - for use_parallel_exe in [False, True]: - if use_parallel_exe and os.name == "nt": - print( - "Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" - ) - continue - self.backward_value_helper( - cond_func, core.is_compiled_with_cuda(), use_parallel_exe - ) - self.add_optimizer_helper( - cond_func, core.is_compiled_with_cuda(), use_parallel_exe - ) + self.backward_value_helper(cond_func, core.is_compiled_with_cuda()) + self.add_optimizer_helper(cond_func, core.is_compiled_with_cuda()) class TestCondWithError(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py b/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py index ab9b99d8cb2499b10edbb8a76e4be5f0ffe0daea..58bb6192f9e3367005630e7d339fffa11efa0d14 100644 --- a/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py +++ b/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import unittest import numpy as np @@ -251,67 +250,5 @@ class TestMultiTask(unittest.TestCase): np.testing.assert_allclose(loss_1, loss_2, rtol=1e-05) -class TestMultiOptimizersMultiCardsError(unittest.TestCase): - def test_error(self): - startup_program = Program() - main_program = Program() - use_cuda = core.is_compiled_with_cuda() - with program_guard(main_program, startup_program): - - def fn_1(opt, avg_loss): - opt.minimize(avg_loss) - - def fn_2(opt, avg_loss): - opt.minimize(avg_loss) - - x = paddle.static.data("X", [-1, 10], 'float32') - hidden = paddle.static.nn.fc(x, 5) - avg_loss = paddle.mean(hidden) - - adam = optimizer.Adam(learning_rate=LR) - sgd = optimizer.SGD(learning_rate=LR) - - cond = layers.fill_constant([1], 'bool', True) - - paddle.static.nn.case( - [(cond, lambda: fn_1(adam, avg_loss))], - lambda: fn_2(sgd, avg_loss), - ) - - cpu_place = fluid.CPUPlace() - cuda_place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - - for place in [cpu_place, cuda_place]: - - exe = fluid.Executor(place) - exe.run(startup_program) - - np.random.seed(SEED) - - # NOTE(liym27): - # This test needs to run in multi cards to test NotImplementedError. - # Here, move this test from RUN_TYPE=DIST in tests/unittests/CMakeList.txt, - # to use multi cards ** only on CPU ** not GPU to reduce CI time. - os.environ['CPU_NUM'] = str(2) - - pe_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - main_program=main_program, - loss_name=avg_loss.name, - ) - num_devices = pe_exe.device_count - - def not_implemented_error(): - pe_exe.run( - feed={ - 'X': np.random.random(size=[64, 10]).astype('float32'), - }, - fetch_list=[avg_loss.name], - ) - - if num_devices > 1: - self.assertRaises(NotImplementedError, not_implemented_error) - - if __name__ == '__main__': unittest.main()