diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index 53c617daf005e3d2e0a5400f3cc6614bcb978a42..687295b1f2c11c79bb23ce1452f0c4b16cd94fcc 100755 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -17,6 +17,7 @@ import copy import warnings import paddle import os +from types import MethodType import numpy as np from paddle.fluid.framework import dygraph_only, _global_flags from paddle.fluid import compiler @@ -33,7 +34,7 @@ from .topology import ParallelMode from ..meta_parallel import TensorParallel, model_parallel_random_seed from ..meta_parallel import PipelineParallel, ShardingParallel from ..meta_optimizers import HybridParallelOptimizer -from ..meta_optimizers import HybridParallelGradScaler +from paddle import _C_ops __all__ = [] @@ -1540,4 +1541,36 @@ class Fleet(object): @dygraph_only def distributed_scaler(self, scaler): - return HybridParallelGradScaler(scaler, self._hcg) + def unscale_method(self, optimizer): + if not self._enable: + return + if getattr(optimizer, '_param_groups', None) and isinstance( + optimizer._param_groups[0], dict): + param_grads = [] + for group in optimizer._param_groups: + for param in group['params']: + if param._grad_ivar() is not None: + param_grads.append(param._grad_ivar()) + else: + param_grads = [ + param._grad_ivar() for param in optimizer._parameter_list + if param._grad_ivar() is not None + ] + _C_ops.check_finite_and_unscale(param_grads, self._scale, + param_grads, self._found_inf) + + self._found_inf = paddle.cast(self._found_inf, dtype="int32") + + # TODO(shenliang03) Since dp allreduce in the optimizer is + # after the gradscaler, check_finite needs to synchronize global + # information. In the future, we should use check_group to speed. + paddle.distributed.all_reduce( + self._found_inf, op=paddle.distributed.ReduceOp.MAX, group=None) + self._found_inf = paddle.cast(self._found_inf, dtype="bool") + + # Only tensor_parallel and pipeline_parallel need to modify scaler + if self._hcg.get_parallel_mode() in (ParallelMode.TENSOR_PARALLEL, + ParallelMode.PIPELINE_PARALLEL): + scaler._unscale = MethodType(unscale_method, scaler) + + return scaler diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py index 72af527896152c446ab97753cf9304febb2d2870..581fbc5153ad49387ac1caf62f319c5fc1e350cd 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py @@ -29,6 +29,19 @@ from paddle.fluid import layers __all__ = [] +def _obtain_optimizer_parameters_list(optimizer): + if getattr(optimizer, '_param_groups', None) and isinstance( + optimizer._param_groups[0], dict): + parameters_list = [] + for group in optimizer._param_groups: + for param in group['params']: + parameters_list.append(param) + else: + parameters_list = [param for param in optimizer._parameter_list] + + return parameters_list + + class HybridParallelClipGrad: def __init__(self, clip, hcg): self._clip = clip @@ -98,6 +111,10 @@ class HybridParallelOptimizer: self._need_dp = (self._hcg.get_data_parallel_world_size() > 1) + # NOTE(shenliang03): Because of the pure DataParallel mode, the gradient synchronization + # is achieved through reducer, so there is no need to call fuse_allreduce in oprimizer. + self._dp_enable = not self._use_dp_mode and self._need_dp + self._sharding_enable = ( self._hcg.get_sharding_parallel_world_size() > 1) @@ -105,20 +122,20 @@ class HybridParallelOptimizer: ClipGradByGlobalNorm) and not self._use_dp_mode: logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \ "optmizer'grad clip will be changed.") + self._inner_opt._grad_clip = HybridParallelClipGrad( self._inner_opt._grad_clip, hcg) @imperative_base.no_grad @framework.dygraph_only def step(self): - + parameters_list = _obtain_optimizer_parameters_list(self._inner_opt) if self._sharding_enable: - sharding_reduce_gradients( - list(self._inner_opt._parameter_list), self._hcg) + sharding_reduce_gradients(list(parameters_list), self._hcg) + + if self._dp_enable: + fused_allreduce_gradients(list(parameters_list), self._hcg) - if not self._use_dp_mode and self._need_dp: - fused_allreduce_gradients( - list(self._inner_opt._parameter_list), self._hcg) self._inner_opt.step() @imperative_base.no_grad @@ -128,16 +145,18 @@ class HybridParallelOptimizer: parameters=None, no_grad_set=None): + # minimize does not support parameters in the form of param_group, + # so no need use _obtain_optimizer_parameters_list parameter_list = parameters if parameters \ else self._inner_opt._parameter_list - # Here shardinng should use global parameter list + # Here sharding should use global parameter list if self._sharding_enable: - sharding_reduce_gradients( - list(self._inner_opt._parameter_list), self._hcg) + sharding_reduce_gradients(list(parameter_list), self._hcg) - if not self._use_dp_mode and self._need_dp: + if self._dp_enable: fused_allreduce_gradients(list(parameter_list), self._hcg) + return self._inner_opt.minimize(loss, startup_program, parameter_list, no_grad_set) diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index ddd4b6c6bb685737f66bcd8f6aea4e330475f90b..8fad0686dd42e601f90ce57ca263265b8382e251 100755 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -80,9 +80,7 @@ class PipelineParallel(MetaParallelBase): def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): assert isinstance(optimizer, HybridParallelOptimizer), ( 'optimizer should be HybridParallelOptimizer subclass.') - if scaler is not None: - assert isinstance(scaler, HybridParallelGradScaler), ( - 'scaler should be HybridParallelGradScaler subclass or None.') + assert fluid.framework._dygraph_tracer()._has_grad, ( 'Please enable the generation of gradients.') @@ -212,7 +210,12 @@ class PipelineParallel(MetaParallelBase): if not last_iter: input_tensor = p2p.recv_forward() - return self.total_loss if self._compute_loss else output_buffers + if self._compute_loss: + self.train_loss = self._broadcast_final_loss() + else: + self.train_loss = output_buffers + + return self.train_loss def _forward_step(self, input_tensor): if self.stage_id == 0: @@ -325,7 +328,7 @@ class PipelineParallel(MetaParallelBase): def _optimizer_step(self): if self.scaler: - self.scaler.minimize(self.optimizer, self.train_loss) + self.scaler.step(self.optimizer) else: self.optimizer.step() diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_amp.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_amp.py index 248c271eec6a1bd139e0727b7a824aaa2f4269bf..083ad319305f347848364f443bfeddfac9c56db6 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_amp.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_amp.py @@ -29,7 +29,11 @@ class TestMPClipGrad(TestDistMPTraning): learning_rate=0.001, gamma=0.999, verbose=True) optimizer = paddle.optimizer.SGD(scheduler, grad_clip=grad_clip, - parameters=model.parameters()) + parameters=[{ + 'params': model.parameters(), + 'weight_decay': 0.001, + 'learning_rate': 0.1 + }]) return optimizer def train_batch(self, batch, model, optimizer, is_mp): @@ -43,7 +47,7 @@ class TestMPClipGrad(TestDistMPTraning): scaled = scaler.scale(loss) # scale the loss scaled.backward() # do backward - scaler.minimize(optimizer, scaled) # update parameters + scaler.step(optimizer) # update parameters optimizer.clear_grad() return scaled