diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 6f57f086e13b05b85d320903ee61d15afb16a29f..7400f45e0592687b6169cebec3a8ef31a6c7a8b8 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -62,6 +62,28 @@ def _prune_feed_ops(program): program.global_block()._remove_op(index) +def _has_optimize_op(block): + for op in block.ops: + op_maker = core.op_proto_and_checker_maker + optimize = core.op_proto_and_checker_maker.OpRole.Optimize + if op_maker.kOpRoleVarAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize): + return True + return False + + +def _has_optimizer_in_control_flow(program): + if not program: + program = framework.default_main_program() + for op in program.global_block().ops: + if op.type == "conditional_block_grad": + sub_block = program.block(op._block_attr_id("sub_block")) + if _has_optimize_op(sub_block): + return True + + return False + + class CompiledProgram(object): """ The CompiledProgram is used to transform a program or graph for @@ -386,6 +408,16 @@ class CompiledProgram(object): self._places = self._get_places(self._place, self._places) else: self._places = [self._place] + + # Todo(liym27):If optimizer is used in control flow, + # training on multi-places is not supported now, will + # be supported later. + if len(self._places) > 1 and \ + _has_optimizer_in_control_flow(self._program): + raise NotImplementedError( + "If optimizer is used in control flow, " + "training on multi-places is not supported now.") + self._executor = self._compile_data_parallel( use_cuda=isinstance(self._place, core.CUDAPlace), scope=self._scope, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 7e7beb494b83e7b76ab18ed078cd9e3c1fe9c107..aad7649777f13ea0a2767758fb0ffbb5ca26ac63 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -333,4 +333,5 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu test_parallel_executor_feed_persistable_var test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_data_norm_op test_imperative_using_non_zero_gpu test_fuse_bn_act_pass + test_optimizer_in_control_flow test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py b/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py index 63579ee80acc2e9eaebed6d58f60fbdc207ab960..4b2914c223a08c52444e085f0ef9e41518694593 100644 --- a/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py +++ b/python/paddle/fluid/tests/unittests/test_optimizer_in_control_flow.py @@ -22,6 +22,8 @@ import paddle.fluid.layers as layers import paddle.fluid.optimizer as optimizer from paddle.fluid.framework import Program, program_guard import paddle.fluid.core as core +import paddle.fluid.compiler as compiler +import os BATCH_SIZE = 1 INPUT_SIZE = 784 @@ -104,20 +106,20 @@ def static(train_data, avg_loss = layers.case([(mod_two, lambda: fn_1(adam, avg_loss_1))], lambda: fn_2(sgd, avg_loss_2)) - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - for epoch in range(EPOCH_NUM): - feed_image, feed_label = train_data[epoch] - fetch_list = [hidden, prediction, avg_loss] - feed = { - 'image': feed_image, - 'label': feed_label, - 'id': np.array([epoch]).astype('int32') - } - out = exe.run(main_program, feed=feed, fetch_list=fetch_list) - out_hidden, out_pred, loss = out + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + + for epoch in range(EPOCH_NUM): + feed_image, feed_label = train_data[epoch] + fetch_list = [hidden, prediction, avg_loss] + feed = { + 'image': feed_image, + 'label': feed_label, + 'id': np.array([epoch]).astype('int32') + } + out = exe.run(main_program, feed=feed, fetch_list=fetch_list) + out_hidden, out_pred, loss = out return out_hidden, out_pred, loss @@ -225,5 +227,58 @@ class TestMultiTask(unittest.TestCase): loss_2)) +class TestMultiOptimizersMultiCardsError(unittest.TestCase): + def test_error(self): + startup_program = Program() + main_program = Program() + use_cuda = core.is_compiled_with_cuda() + with program_guard(main_program, startup_program): + + def fn_1(opt, avg_loss): + opt.minimize(avg_loss) + + def fn_2(opt, avg_loss): + opt.minimize(avg_loss) + + x = fluid.layers.data("X", [10], 'float32') + hidden = layers.fc(x, 5) + avg_loss = layers.mean(hidden) + + adam = optimizer.Adam(learning_rate=LR) + sgd = optimizer.SGD(learning_rate=LR) + + cond = layers.fill_constant([1], 'bool', True) + + layers.case([(cond, lambda: fn_1(adam, avg_loss))], + lambda: fn_2(sgd, avg_loss)) + + cpu_place = fluid.CPUPlace() + cuda_place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + + for place in [cpu_place, cuda_place]: + + exe = fluid.Executor(place) + exe.run(startup_program) + + np.random.seed(SEED) + os.environ['CPU_NUM'] = str(2) + pe_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + main_program=main_program, + loss_name=avg_loss.name) + num_devices = pe_exe.device_count + + def not_implemented_error(): + pe_exe.run(feed={ + 'X': np.random.random(size=[64, 10]).astype('float32'), + }, + fetch_list=[avg_loss.name]) + + if num_devices > 1: + self.assertRaises(NotImplementedError, not_implemented_error) + else: + not_implemented_error() + + if __name__ == '__main__': unittest.main()