未验证 提交 3acb9956 编写于 作者: L liym27 提交者: GitHub

[cherry-pick]add NotImplementedError for multi optimizers (#22181) (#22229)

* add NotImplementedError for multi optimizers used on multi-places . test=develop

* assert error only if num_devices>1. test=develop

* set test_optimizer_in_control_flow in CMakeLists for using multi-GPU.test=develop
上级 0d82baf8
......@@ -62,6 +62,28 @@ def _prune_feed_ops(program):
program.global_block()._remove_op(index)
def _has_optimize_op(block):
for op in block.ops:
op_maker = core.op_proto_and_checker_maker
optimize = core.op_proto_and_checker_maker.OpRole.Optimize
if op_maker.kOpRoleVarAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize):
return True
return False
def _has_optimizer_in_control_flow(program):
if not program:
program = framework.default_main_program()
for op in program.global_block().ops:
if op.type == "conditional_block_grad":
sub_block = program.block(op._block_attr_id("sub_block"))
if _has_optimize_op(sub_block):
return True
return False
class CompiledProgram(object):
"""
The CompiledProgram is used to transform a program or graph for
......@@ -386,6 +408,16 @@ class CompiledProgram(object):
self._places = self._get_places(self._place, self._places)
else:
self._places = [self._place]
# Todo(liym27):If optimizer is used in control flow,
# training on multi-places is not supported now, will
# be supported later.
if len(self._places) > 1 and \
_has_optimizer_in_control_flow(self._program):
raise NotImplementedError(
"If optimizer is used in control flow, "
"training on multi-places is not supported now.")
self._executor = self._compile_data_parallel(
use_cuda=isinstance(self._place, core.CUDAPlace),
scope=self._scope,
......
......@@ -331,4 +331,5 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu
test_parallel_executor_feed_persistable_var
test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
test_data_norm_op test_imperative_using_non_zero_gpu
test_optimizer_in_control_flow
test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST")
......@@ -22,6 +22,8 @@ import paddle.fluid.layers as layers
import paddle.fluid.optimizer as optimizer
from paddle.fluid.framework import Program, program_guard
import paddle.fluid.core as core
import paddle.fluid.compiler as compiler
import os
BATCH_SIZE = 1
INPUT_SIZE = 784
......@@ -104,20 +106,20 @@ def static(train_data,
avg_loss = layers.case([(mod_two, lambda: fn_1(adam, avg_loss_1))],
lambda: fn_2(sgd, avg_loss_2))
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for epoch in range(EPOCH_NUM):
feed_image, feed_label = train_data[epoch]
fetch_list = [hidden, prediction, avg_loss]
feed = {
'image': feed_image,
'label': feed_label,
'id': np.array([epoch]).astype('int32')
}
out = exe.run(main_program, feed=feed, fetch_list=fetch_list)
out_hidden, out_pred, loss = out
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
for epoch in range(EPOCH_NUM):
feed_image, feed_label = train_data[epoch]
fetch_list = [hidden, prediction, avg_loss]
feed = {
'image': feed_image,
'label': feed_label,
'id': np.array([epoch]).astype('int32')
}
out = exe.run(main_program, feed=feed, fetch_list=fetch_list)
out_hidden, out_pred, loss = out
return out_hidden, out_pred, loss
......@@ -225,5 +227,58 @@ class TestMultiTask(unittest.TestCase):
loss_2))
class TestMultiOptimizersMultiCardsError(unittest.TestCase):
def test_error(self):
startup_program = Program()
main_program = Program()
use_cuda = core.is_compiled_with_cuda()
with program_guard(main_program, startup_program):
def fn_1(opt, avg_loss):
opt.minimize(avg_loss)
def fn_2(opt, avg_loss):
opt.minimize(avg_loss)
x = fluid.layers.data("X", [10], 'float32')
hidden = layers.fc(x, 5)
avg_loss = layers.mean(hidden)
adam = optimizer.Adam(learning_rate=LR)
sgd = optimizer.SGD(learning_rate=LR)
cond = layers.fill_constant([1], 'bool', True)
layers.case([(cond, lambda: fn_1(adam, avg_loss))],
lambda: fn_2(sgd, avg_loss))
cpu_place = fluid.CPUPlace()
cuda_place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
for place in [cpu_place, cuda_place]:
exe = fluid.Executor(place)
exe.run(startup_program)
np.random.seed(SEED)
os.environ['CPU_NUM'] = str(2)
pe_exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=avg_loss.name)
num_devices = pe_exe.device_count
def not_implemented_error():
pe_exe.run(feed={
'X': np.random.random(size=[64, 10]).astype('float32'),
},
fetch_list=[avg_loss.name])
if num_devices > 1:
self.assertRaises(NotImplementedError, not_implemented_error)
else:
not_implemented_error()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册