未验证 提交 03babe17 编写于 作者: W WangXi 提交者: GitHub

Fleet distributed strategy support pure fp16 (#30754) (#31238)

上级 188bcbb7
......@@ -44,6 +44,8 @@ message AMPConfig {
repeated string custom_white_list = 7;
repeated string custom_black_list = 8;
repeated string custom_black_varnames = 9;
optional bool use_pure_fp16 = 10 [ default = false ];
optional bool use_fp16_guard = 11 [ default = true ];
}
message LocalSGDConfig {
......
......@@ -49,6 +49,9 @@ def assign_configs_value(msg, config):
for key in config:
for f in fields:
if key == f.name:
# LABEL_OPTIONAL = 1
# LABEL_REPEATED = 3
# LABEL_REQUIRED = 2
if f.label == 3:
getattr(msg, f.name).extend(config[f.name])
elif f.label == 1 or f.label == 2:
......@@ -366,7 +369,14 @@ class DistributedStrategy(object):
custom_black_list(list[str]): Users' custom black list which forbidden execution fp16.
Examples:
custom_black_varnames(list[str]): Users' custom black varibles' names.
use_pure_fp16(bool): Whether to use the pure fp16 training. Default False.
use_fp16_guard(bool): Whether to use `fp16_guard` when constructing the program.
Default True. Only takes effect when `use_pure_fp16` is turned on.
Examples 1:
.. code-block:: python
......@@ -376,6 +386,19 @@ class DistributedStrategy(object):
strategy.amp_configs = {
"init_loss_scaling": 32768,
"custom_white_list": ['conv2d']}
Examples 2:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.amp = True
# pure fp16
strategy.amp_configs = {
"init_loss_scaling": 32768,
"use_pure_fp16": True
}
"""
return get_msg_dict(self.strategy.amp_configs)
......
......@@ -196,6 +196,7 @@ class Fleet(object):
else:
if isinstance(role_maker, RoleMakerBase):
self._role_maker = role_maker
self._is_collective = role_maker._is_collective
else:
raise ValueError(
"`role_maker` should be subclass of `RoleMakerBase`, but got {}".
......@@ -1022,9 +1023,22 @@ class Fleet(object):
if paddle.is_compiled_with_cuda() and len(paddle.static.cuda_places()) > 0:
run_example_code()
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.amp_init(place, scope, test_program,
use_fp16_test)
amp_optimizer = None
for optimizer in self.strategy_compiler._get_applied_meta_optimizer():
if hasattr(optimizer, 'amp_init'):
amp_optimizer = optimizer
break
if amp_optimizer is None:
if hasattr(self.user_defined_optimizer, 'amp_init'):
amp_optimizer = self.user_defined_optimizer
assert amp_optimizer is not None, \
"amp_init can only be used when the amp(auto mixed precision) strategy is turned on."
return amp_optimizer.amp_init(place, scope, test_program, use_fp16_test)
def _final_strategy(self):
if "valid_strategy" not in self._context:
......
......@@ -129,6 +129,9 @@ class StrategyCompiler(StrategyCompilerBase):
self._meta_optimizer_candidates = []
self._graph_optimizer_candidates = []
def _get_applied_meta_optimizer(self):
return self._meta_optimizers
def _get_applied_meta_list(self):
return [type(opt).__name__ for opt in self._meta_optimizers]
......
......@@ -50,7 +50,8 @@ class AMPOptimizer(MetaOptimizerBase):
self.inner_opt, amp_lists, config['init_loss_scaling'],
config['incr_every_n_steps'], config['decr_every_n_nan_or_inf'],
config['incr_ratio'], config['decr_ratio'],
config['use_dynamic_loss_scaling'])
config['use_dynamic_loss_scaling'], config['use_pure_fp16'],
config['use_fp16_guard'])
# if worker_num > 1, all cards will communication with each other,
# add is_distributed to optimize amp, overlap communication and
......@@ -112,3 +113,11 @@ class AMPOptimizer(MetaOptimizerBase):
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
return optimize_ops, params_grads
def amp_init(self,
place,
scope=None,
test_program=None,
use_fp16_test=False):
return self.wrapped_opt.amp_init(place, scope, test_program,
use_fp16_test)
......@@ -165,7 +165,9 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
main_program._hierarchical_allreduce_inter_nranks = local_build_strategy.hierarchical_allreduce_inter_nranks
# TODO(guru4elephant): should be an independent optimizer
self._setup_nccl_op(startup_program, main_program, local_build_strategy)
if worker_num > 1:
self._setup_nccl_op(startup_program, main_program,
local_build_strategy)
local_build_strategy.num_trainers = self.role_maker._worker_num()
local_build_strategy.trainer_id = self.role_maker._worker_index()
......
......@@ -48,6 +48,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_3)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_init)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_gradient_merge_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_sharding_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_localsgd_meta_optimizer)
......@@ -506,6 +507,7 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_fleet_gradient_merge_meta_optimizer MODULES test_fleet_gradient_merge_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_sharding_meta_optimizer MODULES test_fleet_sharding_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_amp_init MODULES test_fleet_amp_init ENVS ${dist_ENVS})
py_test_modules(test_fleet_fp16_allreduce_meta_optimizer MODULES test_fleet_fp16_allreduce_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_private_function MODULES test_fleet_private_function ENVS ${dist_ENVS})
py_test_modules(test_fleet_meta_optimizer_base MODULES test_fleet_meta_optimizer_base ENVS ${dist_ENVS})
......
......@@ -88,6 +88,21 @@ class TestFleetMetaOptimizer(unittest.TestCase):
"custom_white_list": ['softmax'],
"custom_black_list": ['tanh'],
}
elif name == 'pure_fp16':
strategy.amp = True
strategy.amp_configs = {
"init_loss_scaling": 32768,
"decr_every_n_nan_or_inf": 2,
"incr_every_n_steps": 1000,
"incr_ratio": 2.0,
"use_dynamic_loss_scaling": True,
"decr_ratio": 0.5,
"custom_white_list": ['softmax'],
"custom_black_list": ['tanh'],
"use_pure_fp16": True,
"use_fp16_guard": False,
}
elif name == 'dgc':
strategy.dgc = True
strategy.dgc_configs = {
......
......@@ -46,34 +46,88 @@ class TestFleetAMPInit(unittest.TestCase):
def test_fleet_amp_init(self):
if not fluid.core.is_compiled_with_cuda():
return
input_x = paddle.static.data(
name="x", shape=[None, 32], dtype='float32')
input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = paddle.optimizer.Momentum(
learning_rate=0.001,
momentum=0.9,
weight_decay=fluid.regularizer.L2Decay(1e-4),
multi_precision=True)
main_program = paddle.static.Program()
startup_program = paddle.static.Program()
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
optimizer = paddle.static.amp.decorate(optimizer)
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
with paddle.static.program_guard(main_program, startup_program):
input_x = paddle.static.data(
name="x", shape=[None, 32], dtype='float32')
input_y = paddle.static.data(
name="y", shape=[None, 1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = paddle.optimizer.Momentum(
learning_rate=0.001,
momentum=0.9,
weight_decay=fluid.regularizer.L2Decay(1e-4),
multi_precision=True)
optimizer = paddle.static.amp.decorate(optimizer)
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
place = paddle.CUDAPlace(0)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
exe.run(startup_program)
optimizer.amp_init(place)
step = 1
for i in range(step):
cost_val = exe.run(program=paddle.static.default_main_program(),
cost_val = exe.run(program=main_program,
feed=gen_data(),
fetch_list=[cost.name])
def test_fleet_amp_meta_optimizer_init(self):
if not fluid.core.is_compiled_with_cuda():
return
main_program = paddle.static.Program()
startup_program = paddle.static.Program()
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with paddle.static.program_guard(main_program, startup_program):
input_x = paddle.static.data(
name="x", shape=[None, 32], dtype='float32')
input_y = paddle.static.data(
name="y", shape=[None, 1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = paddle.optimizer.Momentum(
learning_rate=0.001,
momentum=0.9,
weight_decay=fluid.regularizer.L2Decay(1e-4),
multi_precision=True)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.amp = True
strategy.amp_configs = {'use_pure_fp16': True}
strategy.gradient_merge = True
strategy.gradient_merge_configs = {"k_steps": 2}
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
print(fleet._get_applied_meta_list())
place = paddle.CUDAPlace(0)
exe = paddle.static.Executor(place)
exe.run(startup_program)
optimizer.amp_init(place)
step = 3
for i in range(step):
cost_val = exe.run(program=main_program,
feed=gen_data(),
fetch_list=[cost.name])
print(cost_val)
if __name__ == '__main__':
......
......@@ -93,6 +93,21 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer):
self.assertIn('cast', ops)
self.assertIn('check_finite_and_unscale', ops)
def test_pure_fp16_optimizer(self):
""" test pure fp16 """
train_prog, startup_prog = fluid.Program(), fluid.Program()
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'pure_fp16')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
params = train_prog.all_parameters()
for param in train_prog.all_parameters():
self.assertEqual(param.dtype, fluid.core.VarDesc.VarType.FP16)
ops = [op.type for op in avg_cost.block.ops]
self.assertIn('cast', ops)
self.assertIn('check_finite_and_unscale', ops)
def test_amp_distributed_optimizer(self):
""" test amp when distributed """
train_prog, startup_prog = fluid.Program(), fluid.Program()
......
......@@ -78,6 +78,7 @@ class TestFleetBaseSingleRunCollective(unittest.TestCase):
}
def test_single_run_collective_minimize(self):
paddle.enable_static()
input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')
......@@ -114,6 +115,7 @@ class TestFleetBaseSingleRunPS(unittest.TestCase):
}
def test_single_run_ps_minimize(self):
paddle.enable_static()
input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')
......
......@@ -53,8 +53,25 @@ class TestFleetGradientMergeMetaOptimizer(TestFleetMetaOptimizer):
self.set_strategy(strategy, 'gradient_merge')
self.set_strategy(strategy, 'amp')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
vars = [x.name for x in train_prog.list_vars()]
self.assertIn('@GradientMerge', ''.join(vars))
self.assertIn('cast', ''.join(vars))
def test_gm_pure_fp16_optimizer(self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'gradient_merge')
self.set_strategy(strategy, 'pure_fp16')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
print(train_prog)
params = train_prog.all_parameters()
for param in train_prog.all_parameters():
self.assertEqual(param.dtype,
paddle.fluid.core.VarDesc.VarType.FP16)
vars = [x.name for x in train_prog.list_vars()]
self.assertIn('@GradientMerge', ''.join(vars))
self.assertIn('cast', ''.join(vars))
......
......@@ -244,7 +244,7 @@ class Adam(Optimizer):
if p.dtype == core.VarDesc.VarType.FP16 and not self._multi_precision:
warnings.warn(
"Accumulating with FP16 in optimizer can lead to poor accuracy or slow convergence."
"Consider using multi_precision=True option of the Momentum optimizer."
"Consider using multi_precision=True option of the Adam optimizer."
)
self._add_moments_pows(p)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册