From 8dd3d4b669ffe5a720e242b4ff2856195daf3bee Mon Sep 17 00:00:00 2001 From: WangXi Date: Sun, 27 Sep 2020 15:31:13 +0000 Subject: [PATCH] fleet meta combine amp dgc recompute, test=develop --- .../fleet/meta_optimizers/amp_optimizer.py | 55 +++++--- .../fleet/meta_optimizers/dgc_optimizer.py | 7 + .../meta_optimizers/recompute_optimizer.py | 25 +++- .../contrib/mixed_precision/decorator.py | 100 ++++++++------ python/paddle/fluid/optimizer.py | 15 +-- .../test_fleet_combine_meta_optimizer.py | 122 ++++++++++++++++++ .../test_fleet_dgc_meta_optimizer.py | 2 + 7 files changed, 256 insertions(+), 70 deletions(-) create mode 100755 python/paddle/fluid/tests/unittests/test_fleet_combine_meta_optimizer.py diff --git a/python/paddle/distributed/fleet/meta_optimizers/amp_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/amp_optimizer.py index ad96e142669..04955fb80c9 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/amp_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/amp_optimizer.py @@ -19,7 +19,7 @@ class AMPOptimizer(MetaOptimizerBase): def __init__(self, optimizer): super(AMPOptimizer, self).__init__(optimizer) self.inner_opt = optimizer - self.amp_opt = None + self.wrapped_opt = None # we do not allow meta optimizer to be inner optimizer currently self.meta_optimizers_white_list = [ "LarsOptimizer", @@ -37,6 +37,24 @@ class AMPOptimizer(MetaOptimizerBase): super(AMPOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) + def _init_wrapped_opt(self): + if self.wrapped_opt is not None: + return + + config = self.user_defined_strategy.amp_configs + + custom_white_list = set(config['custom_white_list']) + custom_black_list = set(config['custom_black_list']) + custom_black_varnames = set(config['custom_black_varnames']) + self.amp_lists = mixed_precision.AutoMixedPrecisionLists( + custom_white_list, custom_black_list, custom_black_varnames) + + self.wrapped_opt = mixed_precision.decorate( + self.inner_opt, amp_lists, config['init_loss_scaling'], + config['incr_every_n_steps'], config['decr_every_n_nan_or_inf'], + config['incr_ratio'], config['decr_ratio'], + config['use_dynamic_loss_scaling']) + def _can_apply(self): if not self.role_maker._is_collective: return False @@ -60,26 +78,31 @@ class AMPOptimizer(MetaOptimizerBase): "use_dynamic_loss_scaling": True } + def backward(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None, + callbacks=None): + # maybe inner_opt of other meta optimizer + self._init_wrapped_opt() + return self.wrapped_opt.backward(loss, startup_program, parameter_list, + no_grad_set, callbacks) + + def apply_gradients(self, params_grads): + return self.wrapped_opt.apply_gradients(params_grads=params_grads) + + def apply_optimize(self, loss, startup_program, params_grads): + return self.wrapped_opt.apply_optimize( + loss, startup_program=startup_program, params_grads=params_grads) + def minimize_impl(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): - if self.amp_opt is None: - config = self.user_defined_strategy.amp_configs - custom_white_list = set(config['custom_white_list']) - custom_black_list = set(config['custom_black_list']) - custom_black_varnames = set(config['custom_black_varnames']) - amp_lists = mixed_precision.AutoMixedPrecisionLists( - custom_white_list, custom_black_list, custom_black_varnames) - - self.amp_opt = mixed_precision.decorate( - self.inner_opt, amp_lists, config['init_loss_scaling'], - config['incr_every_n_steps'], config['decr_every_n_nan_or_inf'], - config['incr_ratio'], config['decr_ratio'], - config['use_dynamic_loss_scaling']) - + self._init_wrapped_opt() optimize_ops, params_grads = \ - self.amp_opt.minimize(loss, startup_program, + self.wrapped_opt.minimize(loss, startup_program, parameter_list, no_grad_set) return optimize_ops, params_grads diff --git a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py index 6806a479d30..9990021c850 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py @@ -85,6 +85,13 @@ class DGCOptimizer(MetaOptimizerBase): return self.dgc_opt.backward(loss, startup_program, parameter_list, no_grad_set, callbacks) + def apply_gradients(self, params_grads): + return self.dgc_opt.apply_gradients(params_grads=params_grads) + + def apply_optimize(self, loss, startup_program, params_grads): + return self.dgc_opt.apply_optimize( + loss, startup_program=startup_program, params_grads=params_grads) + def minimize_impl(self, loss, startup_program=None, diff --git a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py index 59ca7e63309..42fe9f6ced7 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py @@ -18,15 +18,15 @@ from .meta_optimizer_base import MetaOptimizerBase class RecomputeOptimizer(MetaOptimizerBase): def __init__(self, optimizer): super(RecomputeOptimizer, self).__init__(optimizer) - #self.inner_opt = RO(optimizer) self.inner_opt = optimizer - self.wrapped_opt = RO(optimizer) + self.wrapped_opt = None # we do not allow meta optimizer to be inner optimizer currently self.meta_optimizers_white_list = [ "LarsOptimizer", "LambOptimizer", "GradientMergeOptimizer", "GraphExecutionOptimizer", + "DGCOptimizer", ] self.meta_optimizers_black_list = [] @@ -34,8 +34,15 @@ class RecomputeOptimizer(MetaOptimizerBase): user_defined_strategy): super(RecomputeOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) - self.wrapped_opt._set_checkpoints( - list(user_defined_strategy.recompute_configs["checkpoints"])) + + def _init_wrapped_opt(self): + if self.wrapped_opt is not None: + return + + configs = self.user_defined_strategy.recompute_configs + + self.wrapped_opt = RO(self.inner_opt) + self.wrapped_opt._set_checkpoints(list(configs["checkpoints"])) def _can_apply(self): if not self.role_maker._is_collective: @@ -62,14 +69,24 @@ class RecomputeOptimizer(MetaOptimizerBase): parameter_list=None, no_grad_set=None, callbacks=None): + # maybe inner_opt of other meta optimizer + self._init_wrapped_opt() return self.wrapped_opt.backward(loss, startup_program, parameter_list, no_grad_set, callbacks) + def apply_gradients(self, params_grads): + return self.wrapped_opt.apply_gradients(params_grads=params_grads) + + def apply_optimize(self, loss, startup_program, params_grads): + return self.wrapped_opt.apply_optimize( + loss, startup_program=startup_program, params_grads=params_grads) + def minimize_impl(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): + self._init_wrapped_opt() optimize_ops, params_grads = \ self.wrapped_opt.minimize(loss, startup_program, parameter_list, no_grad_set) diff --git a/python/paddle/fluid/contrib/mixed_precision/decorator.py b/python/paddle/fluid/contrib/mixed_precision/decorator.py index c9112ac849c..5f40a0cec95 100644 --- a/python/paddle/fluid/contrib/mixed_precision/decorator.py +++ b/python/paddle/fluid/contrib/mixed_precision/decorator.py @@ -16,6 +16,7 @@ from ... import default_main_program from ... import default_startup_program from ... import layers from ... import unique_name +from ... import program_guard from . import fp16_utils from .fp16_utils import rewrite_program from .fp16_utils import update_role_var_grad @@ -58,21 +59,40 @@ class OptimizerWithMixedPrecision(object): self._optimizer = optimizer self._amp_lists = amp_lists self._param_grads = None - self._train_program = default_main_program() - self._startup_prog = default_startup_program() + self._train_program = None + self._scaled_loss = None - self._loss_scaling = layers.create_global_var( - name=unique_name.generate("loss_scaling"), - shape=[1], - value=init_loss_scaling, - dtype='float32', - persistable=True) + self._loss_scaling = None + self._init_loss_scaling = init_loss_scaling self._use_dynamic_loss_scaling = use_dynamic_loss_scaling if self._use_dynamic_loss_scaling: self._incr_every_n_steps = incr_every_n_steps self._decr_every_n_nan_or_inf = decr_every_n_nan_or_inf self._incr_ratio = incr_ratio self._decr_ratio = decr_ratio + self._num_good_steps = None + self._num_bad_steps = None + + def get_loss_scaling(self): + """Return the real-time loss scaling factor. + """ + return self._loss_scaling + + def get_scaled_loss(self): + """Return the scaled loss. + It's useful when you feed customed loss into executor. + """ + return self._scaled_loss + + def _init_amp_var(self): + self._loss_scaling = layers.create_global_var( + name=unique_name.generate("loss_scaling"), + shape=[1], + value=self._init_loss_scaling, + dtype='float32', + persistable=True) + + if self._use_dynamic_loss_scaling: self._num_good_steps = layers.create_global_var( name=unique_name.generate("num_good_steps"), shape=[1], @@ -86,28 +106,16 @@ class OptimizerWithMixedPrecision(object): dtype='int32', persistable=True) - # Ensure the data type of learning rate vars is float32 (same as the + # Ensure the data type of learning rate vars is float32 (same as the # master parameter dtype) - if isinstance(optimizer._learning_rate, float): - optimizer._learning_rate_map[default_main_program()] = \ - layers.create_global_var( - name=unique_name.generate("learning_rate"), - shape=[1], - value=float(optimizer._learning_rate), - dtype='float32', - persistable=True) - - def get_loss_scaling(self): - """Return the real-time loss scaling factor. - """ - return self._loss_scaling - - def get_scaled_loss(self): - """Return the scaled loss. - It's useful when you feed customed loss into executor. - """ - - return self._scaled_loss + if isinstance(self._optimizer._learning_rate, float): + self._optimizer._learning_rate_map[default_main_program()] = \ + layers.create_global_var( + name=unique_name.generate("learning_rate"), + shape=[1], + value=float(self._optimizer._learning_rate), + dtype='float32', + persistable=True) def backward(self, loss, @@ -131,16 +139,21 @@ class OptimizerWithMixedPrecision(object): A list of (param, grad), which is a tuple of a parameter and its gradient respectively, and the scaled loss. """ - rewrite_program(self._train_program, self._amp_lists) - self._scaled_loss = loss * self._loss_scaling - self._params_grads = self._optimizer.backward( - self._scaled_loss, startup_program, parameter_list, no_grad_set, - callbacks) - # Change the op_role_var attr for some ops, so that gradients - # transferred across GPUs can be FP16. - update_role_var_grad(self._train_program, self._params_grads) - - return self._params_grads + train_program = loss.block.program + self._train_program = train_program + + with program_guard(train_program, startup_program): + self._init_amp_var() + + rewrite_program(train_program, self._amp_lists) + self._scaled_loss = loss * self._loss_scaling + params_grads = self._optimizer.backward( + self._scaled_loss, startup_program, parameter_list, no_grad_set, + callbacks) + # Change the op_role_var attr for some ops, so that gradients + # transferred across GPUs can be FP16. + update_role_var_grad(train_program, params_grads) + return params_grads def apply_gradients(self, params_grads): """ @@ -182,6 +195,12 @@ class OptimizerWithMixedPrecision(object): return optimize_ops + def apply_optimize(self, loss, startup_program, param_grads): + program = loss.block.program + with program_guard(program, startup_program): + optimize_ops = self.apply_gradients(param_grads) + return optimize_ops + def minimize(self, loss, startup_program=None, @@ -207,7 +226,8 @@ class OptimizerWithMixedPrecision(object): parameter_list=parameter_list, no_grad_set=no_grad_set) - optimize_ops = self.apply_gradients(scaled_params_grads) + optimize_ops = self.apply_optimize(loss, startup_program, + scaled_params_grads) return optimize_ops, scaled_params_grads diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 0dd1694c86c..d72bc505fad 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -730,9 +730,6 @@ class Optimizer(object): outputs={"ParamOut": param_and_grad[0]}) return new_param_grads, (table_param, table_grad), sgd_op - def _append_dgc_ops(self, param_and_grad): - pass - def backward(self, loss, startup_program=None, @@ -794,9 +791,6 @@ class Optimizer(object): with program_guard(program, startup_program): params_grads = append_backward(loss, parameter_list, act_no_grad_set, callbacks) - # Note: since we can't use all_reduce_op now, - # dgc_op should be the last op of one grad. - self._append_dgc_ops(params_grads) return params_grads def apply_gradients(self, params_grads): @@ -1561,6 +1555,11 @@ class DGCMomentumOptimizer(Optimizer): @imperative_base.no_grad def apply_gradients(self, params_grads): + # Note: since we can't use all_reduce_op now, + # dgc_op should be the last op of one grad. + # Maybe need a grad allreduce pass. + self._append_dgc_ops(params_grads) + params_grads = sorted(params_grads, key=lambda x: x[0].name) params_grads, table_param_and_grad, table_optimize_op = \ self._process_distribute_lookuptable(params_grads) @@ -4776,10 +4775,6 @@ class RecomputeOptimizer(Optimizer): params_grads = append_backward( loss, parameter_list, no_grad_set, checkpoints=checkpoint_vars) - # Note: since we can't use all_reduce_op now, - # dgc_op should be the last op of one grad. - if hasattr(self._optimizer, "_append_dgc_ops"): - self._optimizer._append_dgc_ops(params_grads) return params_grads def apply_optimize(self, loss, startup_program, params_grads): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_combine_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_combine_meta_optimizer.py new file mode 100755 index 00000000000..2ed0d7bd685 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_combine_meta_optimizer.py @@ -0,0 +1,122 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +from paddle import fluid +import os +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker + +paddle.enable_static() + + +class TestFleetCombineOptimizer(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_TRAINER_ID"] = "1" + os.environ[ + "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.1:36002" + + def net(self, main_prog, startup_prog): + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data( + name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, + size=64, + act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=256, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], + size=2, + act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.dgc = True + strategy.dgc_configs = { + "rampup_begin_step": 128, + "rampup_step": 100, + "sparsity": [0.996, 0.999] + } + return avg_cost, strategy + + def optimizer(self, loss, strategy, train_prog, startup_prog): + with fluid.program_guard(train_prog, startup_prog): + with fluid.unique_name.guard(): + optimizer = paddle.fluid.optimizer.Momentum( + learning_rate=0.01, momentum=0.9) + optimizer = fleet.distributed_optimizer( + optimizer, strategy=strategy) + optimizer.minimize(loss) + + def set_strategy(self, strategy, name): + if name == 'amp': + strategy.amp = True + elif name == 'dgc': + strategy.dgc = True + elif name == 'recompute': + strategy.recompute = True + strategy.recompute_configs = { + "checkpoints": ["fc_0.tmp_2", "fc_1.tmp_2"] + } + + def test_dgc_recompute_optimizer(self): + train_prog = fluid.Program() + startup_prog = fluid.Program() + avg_cost, strategy = self.net(train_prog, startup_prog) + + self.set_strategy(strategy, 'dgc') + self.set_strategy(strategy, 'recompute') + + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + + ops = [op.type for op in avg_cost.block.ops] + outs = [ + op.output('Out')[0] for op in avg_cost.block.ops if op.type == 'mul' + ] + self.assertIn('dgc', ops) + self.assertIn('dgc_momentum', ops) + + self.assertIn('subprog', ''.join(outs)) + + def test_amp_recompute_optimizer(self): + train_prog = fluid.Program() + startup_prog = fluid.Program() + avg_cost, strategy = self.net(train_prog, startup_prog) + + self.set_strategy(strategy, 'amp') + self.set_strategy(strategy, 'recompute') + + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + + ops = [op.type for op in avg_cost.block.ops] + outs = [ + op.output('Out')[0] for op in avg_cost.block.ops if op.type == 'mul' + ] + print(train_prog) + self.assertIn('cast', ops) + self.assertIn('check_finite_and_unscale', ops) + + self.assertIn('subprog', ''.join(outs)) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py index 55d4ff7726a..d3615c9605a 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py @@ -19,6 +19,8 @@ import os import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker +paddle.enable_static() + class TestFleetDGCOptimizer(unittest.TestCase): def setUp(self): -- GitLab