提交 8dd3d4b6 编写于 作者: W WangXi

fleet meta combine amp dgc recompute, test=develop

上级 516d84b2
......@@ -19,7 +19,7 @@ class AMPOptimizer(MetaOptimizerBase):
def __init__(self, optimizer):
super(AMPOptimizer, self).__init__(optimizer)
self.inner_opt = optimizer
self.amp_opt = None
self.wrapped_opt = None
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = [
"LarsOptimizer",
......@@ -37,6 +37,24 @@ class AMPOptimizer(MetaOptimizerBase):
super(AMPOptimizer, self)._set_basic_info(
loss, role_maker, user_defined_optimizer, user_defined_strategy)
def _init_wrapped_opt(self):
if self.wrapped_opt is not None:
return
config = self.user_defined_strategy.amp_configs
custom_white_list = set(config['custom_white_list'])
custom_black_list = set(config['custom_black_list'])
custom_black_varnames = set(config['custom_black_varnames'])
self.amp_lists = mixed_precision.AutoMixedPrecisionLists(
custom_white_list, custom_black_list, custom_black_varnames)
self.wrapped_opt = mixed_precision.decorate(
self.inner_opt, amp_lists, config['init_loss_scaling'],
config['incr_every_n_steps'], config['decr_every_n_nan_or_inf'],
config['incr_ratio'], config['decr_ratio'],
config['use_dynamic_loss_scaling'])
def _can_apply(self):
if not self.role_maker._is_collective:
return False
......@@ -60,26 +78,31 @@ class AMPOptimizer(MetaOptimizerBase):
"use_dynamic_loss_scaling": True
}
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
# maybe inner_opt of other meta optimizer
self._init_wrapped_opt()
return self.wrapped_opt.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
return self.wrapped_opt.apply_gradients(params_grads=params_grads)
def apply_optimize(self, loss, startup_program, params_grads):
return self.wrapped_opt.apply_optimize(
loss, startup_program=startup_program, params_grads=params_grads)
def minimize_impl(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
if self.amp_opt is None:
config = self.user_defined_strategy.amp_configs
custom_white_list = set(config['custom_white_list'])
custom_black_list = set(config['custom_black_list'])
custom_black_varnames = set(config['custom_black_varnames'])
amp_lists = mixed_precision.AutoMixedPrecisionLists(
custom_white_list, custom_black_list, custom_black_varnames)
self.amp_opt = mixed_precision.decorate(
self.inner_opt, amp_lists, config['init_loss_scaling'],
config['incr_every_n_steps'], config['decr_every_n_nan_or_inf'],
config['incr_ratio'], config['decr_ratio'],
config['use_dynamic_loss_scaling'])
self._init_wrapped_opt()
optimize_ops, params_grads = \
self.amp_opt.minimize(loss, startup_program,
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
return optimize_ops, params_grads
......@@ -85,6 +85,13 @@ class DGCOptimizer(MetaOptimizerBase):
return self.dgc_opt.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
return self.dgc_opt.apply_gradients(params_grads=params_grads)
def apply_optimize(self, loss, startup_program, params_grads):
return self.dgc_opt.apply_optimize(
loss, startup_program=startup_program, params_grads=params_grads)
def minimize_impl(self,
loss,
startup_program=None,
......
......@@ -18,15 +18,15 @@ from .meta_optimizer_base import MetaOptimizerBase
class RecomputeOptimizer(MetaOptimizerBase):
def __init__(self, optimizer):
super(RecomputeOptimizer, self).__init__(optimizer)
#self.inner_opt = RO(optimizer)
self.inner_opt = optimizer
self.wrapped_opt = RO(optimizer)
self.wrapped_opt = None
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = [
"LarsOptimizer",
"LambOptimizer",
"GradientMergeOptimizer",
"GraphExecutionOptimizer",
"DGCOptimizer",
]
self.meta_optimizers_black_list = []
......@@ -34,8 +34,15 @@ class RecomputeOptimizer(MetaOptimizerBase):
user_defined_strategy):
super(RecomputeOptimizer, self)._set_basic_info(
loss, role_maker, user_defined_optimizer, user_defined_strategy)
self.wrapped_opt._set_checkpoints(
list(user_defined_strategy.recompute_configs["checkpoints"]))
def _init_wrapped_opt(self):
if self.wrapped_opt is not None:
return
configs = self.user_defined_strategy.recompute_configs
self.wrapped_opt = RO(self.inner_opt)
self.wrapped_opt._set_checkpoints(list(configs["checkpoints"]))
def _can_apply(self):
if not self.role_maker._is_collective:
......@@ -62,14 +69,24 @@ class RecomputeOptimizer(MetaOptimizerBase):
parameter_list=None,
no_grad_set=None,
callbacks=None):
# maybe inner_opt of other meta optimizer
self._init_wrapped_opt()
return self.wrapped_opt.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
return self.wrapped_opt.apply_gradients(params_grads=params_grads)
def apply_optimize(self, loss, startup_program, params_grads):
return self.wrapped_opt.apply_optimize(
loss, startup_program=startup_program, params_grads=params_grads)
def minimize_impl(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
self._init_wrapped_opt()
optimize_ops, params_grads = \
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
......
......@@ -16,6 +16,7 @@ from ... import default_main_program
from ... import default_startup_program
from ... import layers
from ... import unique_name
from ... import program_guard
from . import fp16_utils
from .fp16_utils import rewrite_program
from .fp16_utils import update_role_var_grad
......@@ -58,21 +59,40 @@ class OptimizerWithMixedPrecision(object):
self._optimizer = optimizer
self._amp_lists = amp_lists
self._param_grads = None
self._train_program = default_main_program()
self._startup_prog = default_startup_program()
self._train_program = None
self._scaled_loss = None
self._loss_scaling = layers.create_global_var(
name=unique_name.generate("loss_scaling"),
shape=[1],
value=init_loss_scaling,
dtype='float32',
persistable=True)
self._loss_scaling = None
self._init_loss_scaling = init_loss_scaling
self._use_dynamic_loss_scaling = use_dynamic_loss_scaling
if self._use_dynamic_loss_scaling:
self._incr_every_n_steps = incr_every_n_steps
self._decr_every_n_nan_or_inf = decr_every_n_nan_or_inf
self._incr_ratio = incr_ratio
self._decr_ratio = decr_ratio
self._num_good_steps = None
self._num_bad_steps = None
def get_loss_scaling(self):
"""Return the real-time loss scaling factor.
"""
return self._loss_scaling
def get_scaled_loss(self):
"""Return the scaled loss.
It's useful when you feed customed loss into executor.
"""
return self._scaled_loss
def _init_amp_var(self):
self._loss_scaling = layers.create_global_var(
name=unique_name.generate("loss_scaling"),
shape=[1],
value=self._init_loss_scaling,
dtype='float32',
persistable=True)
if self._use_dynamic_loss_scaling:
self._num_good_steps = layers.create_global_var(
name=unique_name.generate("num_good_steps"),
shape=[1],
......@@ -86,28 +106,16 @@ class OptimizerWithMixedPrecision(object):
dtype='int32',
persistable=True)
# Ensure the data type of learning rate vars is float32 (same as the
# Ensure the data type of learning rate vars is float32 (same as the
# master parameter dtype)
if isinstance(optimizer._learning_rate, float):
optimizer._learning_rate_map[default_main_program()] = \
layers.create_global_var(
name=unique_name.generate("learning_rate"),
shape=[1],
value=float(optimizer._learning_rate),
dtype='float32',
persistable=True)
def get_loss_scaling(self):
"""Return the real-time loss scaling factor.
"""
return self._loss_scaling
def get_scaled_loss(self):
"""Return the scaled loss.
It's useful when you feed customed loss into executor.
"""
return self._scaled_loss
if isinstance(self._optimizer._learning_rate, float):
self._optimizer._learning_rate_map[default_main_program()] = \
layers.create_global_var(
name=unique_name.generate("learning_rate"),
shape=[1],
value=float(self._optimizer._learning_rate),
dtype='float32',
persistable=True)
def backward(self,
loss,
......@@ -131,16 +139,21 @@ class OptimizerWithMixedPrecision(object):
A list of (param, grad), which is a tuple of a parameter and its
gradient respectively, and the scaled loss.
"""
rewrite_program(self._train_program, self._amp_lists)
self._scaled_loss = loss * self._loss_scaling
self._params_grads = self._optimizer.backward(
self._scaled_loss, startup_program, parameter_list, no_grad_set,
callbacks)
# Change the op_role_var attr for some ops, so that gradients
# transferred across GPUs can be FP16.
update_role_var_grad(self._train_program, self._params_grads)
return self._params_grads
train_program = loss.block.program
self._train_program = train_program
with program_guard(train_program, startup_program):
self._init_amp_var()
rewrite_program(train_program, self._amp_lists)
self._scaled_loss = loss * self._loss_scaling
params_grads = self._optimizer.backward(
self._scaled_loss, startup_program, parameter_list, no_grad_set,
callbacks)
# Change the op_role_var attr for some ops, so that gradients
# transferred across GPUs can be FP16.
update_role_var_grad(train_program, params_grads)
return params_grads
def apply_gradients(self, params_grads):
"""
......@@ -182,6 +195,12 @@ class OptimizerWithMixedPrecision(object):
return optimize_ops
def apply_optimize(self, loss, startup_program, param_grads):
program = loss.block.program
with program_guard(program, startup_program):
optimize_ops = self.apply_gradients(param_grads)
return optimize_ops
def minimize(self,
loss,
startup_program=None,
......@@ -207,7 +226,8 @@ class OptimizerWithMixedPrecision(object):
parameter_list=parameter_list,
no_grad_set=no_grad_set)
optimize_ops = self.apply_gradients(scaled_params_grads)
optimize_ops = self.apply_optimize(loss, startup_program,
scaled_params_grads)
return optimize_ops, scaled_params_grads
......
......@@ -730,9 +730,6 @@ class Optimizer(object):
outputs={"ParamOut": param_and_grad[0]})
return new_param_grads, (table_param, table_grad), sgd_op
def _append_dgc_ops(self, param_and_grad):
pass
def backward(self,
loss,
startup_program=None,
......@@ -794,9 +791,6 @@ class Optimizer(object):
with program_guard(program, startup_program):
params_grads = append_backward(loss, parameter_list,
act_no_grad_set, callbacks)
# Note: since we can't use all_reduce_op now,
# dgc_op should be the last op of one grad.
self._append_dgc_ops(params_grads)
return params_grads
def apply_gradients(self, params_grads):
......@@ -1561,6 +1555,11 @@ class DGCMomentumOptimizer(Optimizer):
@imperative_base.no_grad
def apply_gradients(self, params_grads):
# Note: since we can't use all_reduce_op now,
# dgc_op should be the last op of one grad.
# Maybe need a grad allreduce pass.
self._append_dgc_ops(params_grads)
params_grads = sorted(params_grads, key=lambda x: x[0].name)
params_grads, table_param_and_grad, table_optimize_op = \
self._process_distribute_lookuptable(params_grads)
......@@ -4776,10 +4775,6 @@ class RecomputeOptimizer(Optimizer):
params_grads = append_backward(
loss, parameter_list, no_grad_set, checkpoints=checkpoint_vars)
# Note: since we can't use all_reduce_op now,
# dgc_op should be the last op of one grad.
if hasattr(self._optimizer, "_append_dgc_ops"):
self._optimizer._append_dgc_ops(params_grads)
return params_grads
def apply_optimize(self, loss, startup_program, params_grads):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
from paddle import fluid
import os
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
paddle.enable_static()
class TestFleetCombineOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_TRAINER_ID"] = "1"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.1:36002"
def net(self, main_prog, startup_prog):
with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(
name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x,
size=64,
act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=256, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2],
size=2,
act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.dgc = True
strategy.dgc_configs = {
"rampup_begin_step": 128,
"rampup_step": 100,
"sparsity": [0.996, 0.999]
}
return avg_cost, strategy
def optimizer(self, loss, strategy, train_prog, startup_prog):
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
optimizer = paddle.fluid.optimizer.Momentum(
learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(
optimizer, strategy=strategy)
optimizer.minimize(loss)
def set_strategy(self, strategy, name):
if name == 'amp':
strategy.amp = True
elif name == 'dgc':
strategy.dgc = True
elif name == 'recompute':
strategy.recompute = True
strategy.recompute_configs = {
"checkpoints": ["fc_0.tmp_2", "fc_1.tmp_2"]
}
def test_dgc_recompute_optimizer(self):
train_prog = fluid.Program()
startup_prog = fluid.Program()
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'dgc')
self.set_strategy(strategy, 'recompute')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
ops = [op.type for op in avg_cost.block.ops]
outs = [
op.output('Out')[0] for op in avg_cost.block.ops if op.type == 'mul'
]
self.assertIn('dgc', ops)
self.assertIn('dgc_momentum', ops)
self.assertIn('subprog', ''.join(outs))
def test_amp_recompute_optimizer(self):
train_prog = fluid.Program()
startup_prog = fluid.Program()
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'amp')
self.set_strategy(strategy, 'recompute')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
ops = [op.type for op in avg_cost.block.ops]
outs = [
op.output('Out')[0] for op in avg_cost.block.ops if op.type == 'mul'
]
print(train_prog)
self.assertIn('cast', ops)
self.assertIn('check_finite_and_unscale', ops)
self.assertIn('subprog', ''.join(outs))
if __name__ == "__main__":
unittest.main()
......@@ -19,6 +19,8 @@ import os
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
paddle.enable_static()
class TestFleetDGCOptimizer(unittest.TestCase):
def setUp(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册