未验证 提交 2bb44317 编写于 作者: S ShenLiang 提交者: GitHub

[HybridParallel]Fix scaler bug in pipeline_parallel/model_parallel (#35556)

* support grad group

* fix single card condition
上级 48ec02f1
...@@ -17,6 +17,7 @@ import copy ...@@ -17,6 +17,7 @@ import copy
import warnings import warnings
import paddle import paddle
import os import os
from types import MethodType
import numpy as np import numpy as np
from paddle.fluid.framework import dygraph_only, _global_flags from paddle.fluid.framework import dygraph_only, _global_flags
from paddle.fluid import compiler from paddle.fluid import compiler
...@@ -33,7 +34,7 @@ from .topology import ParallelMode ...@@ -33,7 +34,7 @@ from .topology import ParallelMode
from ..meta_parallel import TensorParallel, model_parallel_random_seed from ..meta_parallel import TensorParallel, model_parallel_random_seed
from ..meta_parallel import PipelineParallel, ShardingParallel from ..meta_parallel import PipelineParallel, ShardingParallel
from ..meta_optimizers import HybridParallelOptimizer from ..meta_optimizers import HybridParallelOptimizer
from ..meta_optimizers import HybridParallelGradScaler from paddle import _C_ops
__all__ = [] __all__ = []
...@@ -1540,4 +1541,36 @@ class Fleet(object): ...@@ -1540,4 +1541,36 @@ class Fleet(object):
@dygraph_only @dygraph_only
def distributed_scaler(self, scaler): def distributed_scaler(self, scaler):
return HybridParallelGradScaler(scaler, self._hcg) def unscale_method(self, optimizer):
if not self._enable:
return
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
param_grads = []
for group in optimizer._param_groups:
for param in group['params']:
if param._grad_ivar() is not None:
param_grads.append(param._grad_ivar())
else:
param_grads = [
param._grad_ivar() for param in optimizer._parameter_list
if param._grad_ivar() is not None
]
_C_ops.check_finite_and_unscale(param_grads, self._scale,
param_grads, self._found_inf)
self._found_inf = paddle.cast(self._found_inf, dtype="int32")
# TODO(shenliang03) Since dp allreduce in the optimizer is
# after the gradscaler, check_finite needs to synchronize global
# information. In the future, we should use check_group to speed.
paddle.distributed.all_reduce(
self._found_inf, op=paddle.distributed.ReduceOp.MAX, group=None)
self._found_inf = paddle.cast(self._found_inf, dtype="bool")
# Only tensor_parallel and pipeline_parallel need to modify scaler
if self._hcg.get_parallel_mode() in (ParallelMode.TENSOR_PARALLEL,
ParallelMode.PIPELINE_PARALLEL):
scaler._unscale = MethodType(unscale_method, scaler)
return scaler
...@@ -29,6 +29,19 @@ from paddle.fluid import layers ...@@ -29,6 +29,19 @@ from paddle.fluid import layers
__all__ = [] __all__ = []
def _obtain_optimizer_parameters_list(optimizer):
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
parameters_list = []
for group in optimizer._param_groups:
for param in group['params']:
parameters_list.append(param)
else:
parameters_list = [param for param in optimizer._parameter_list]
return parameters_list
class HybridParallelClipGrad: class HybridParallelClipGrad:
def __init__(self, clip, hcg): def __init__(self, clip, hcg):
self._clip = clip self._clip = clip
...@@ -98,6 +111,10 @@ class HybridParallelOptimizer: ...@@ -98,6 +111,10 @@ class HybridParallelOptimizer:
self._need_dp = (self._hcg.get_data_parallel_world_size() > 1) self._need_dp = (self._hcg.get_data_parallel_world_size() > 1)
# NOTE(shenliang03): Because of the pure DataParallel mode, the gradient synchronization
# is achieved through reducer, so there is no need to call fuse_allreduce in oprimizer.
self._dp_enable = not self._use_dp_mode and self._need_dp
self._sharding_enable = ( self._sharding_enable = (
self._hcg.get_sharding_parallel_world_size() > 1) self._hcg.get_sharding_parallel_world_size() > 1)
...@@ -105,20 +122,20 @@ class HybridParallelOptimizer: ...@@ -105,20 +122,20 @@ class HybridParallelOptimizer:
ClipGradByGlobalNorm) and not self._use_dp_mode: ClipGradByGlobalNorm) and not self._use_dp_mode:
logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \ logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \
"optmizer'grad clip will be changed.") "optmizer'grad clip will be changed.")
self._inner_opt._grad_clip = HybridParallelClipGrad( self._inner_opt._grad_clip = HybridParallelClipGrad(
self._inner_opt._grad_clip, hcg) self._inner_opt._grad_clip, hcg)
@imperative_base.no_grad @imperative_base.no_grad
@framework.dygraph_only @framework.dygraph_only
def step(self): def step(self):
parameters_list = _obtain_optimizer_parameters_list(self._inner_opt)
if self._sharding_enable: if self._sharding_enable:
sharding_reduce_gradients( sharding_reduce_gradients(list(parameters_list), self._hcg)
list(self._inner_opt._parameter_list), self._hcg)
if self._dp_enable:
fused_allreduce_gradients(list(parameters_list), self._hcg)
if not self._use_dp_mode and self._need_dp:
fused_allreduce_gradients(
list(self._inner_opt._parameter_list), self._hcg)
self._inner_opt.step() self._inner_opt.step()
@imperative_base.no_grad @imperative_base.no_grad
...@@ -128,16 +145,18 @@ class HybridParallelOptimizer: ...@@ -128,16 +145,18 @@ class HybridParallelOptimizer:
parameters=None, parameters=None,
no_grad_set=None): no_grad_set=None):
# minimize does not support parameters in the form of param_group,
# so no need use _obtain_optimizer_parameters_list
parameter_list = parameters if parameters \ parameter_list = parameters if parameters \
else self._inner_opt._parameter_list else self._inner_opt._parameter_list
# Here shardinng should use global parameter list # Here sharding should use global parameter list
if self._sharding_enable: if self._sharding_enable:
sharding_reduce_gradients( sharding_reduce_gradients(list(parameter_list), self._hcg)
list(self._inner_opt._parameter_list), self._hcg)
if not self._use_dp_mode and self._need_dp: if self._dp_enable:
fused_allreduce_gradients(list(parameter_list), self._hcg) fused_allreduce_gradients(list(parameter_list), self._hcg)
return self._inner_opt.minimize(loss, startup_program, parameter_list, return self._inner_opt.minimize(loss, startup_program, parameter_list,
no_grad_set) no_grad_set)
......
...@@ -80,9 +80,7 @@ class PipelineParallel(MetaParallelBase): ...@@ -80,9 +80,7 @@ class PipelineParallel(MetaParallelBase):
def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None):
assert isinstance(optimizer, HybridParallelOptimizer), ( assert isinstance(optimizer, HybridParallelOptimizer), (
'optimizer should be HybridParallelOptimizer subclass.') 'optimizer should be HybridParallelOptimizer subclass.')
if scaler is not None:
assert isinstance(scaler, HybridParallelGradScaler), (
'scaler should be HybridParallelGradScaler subclass or None.')
assert fluid.framework._dygraph_tracer()._has_grad, ( assert fluid.framework._dygraph_tracer()._has_grad, (
'Please enable the generation of gradients.') 'Please enable the generation of gradients.')
...@@ -212,7 +210,12 @@ class PipelineParallel(MetaParallelBase): ...@@ -212,7 +210,12 @@ class PipelineParallel(MetaParallelBase):
if not last_iter: if not last_iter:
input_tensor = p2p.recv_forward() input_tensor = p2p.recv_forward()
return self.total_loss if self._compute_loss else output_buffers if self._compute_loss:
self.train_loss = self._broadcast_final_loss()
else:
self.train_loss = output_buffers
return self.train_loss
def _forward_step(self, input_tensor): def _forward_step(self, input_tensor):
if self.stage_id == 0: if self.stage_id == 0:
...@@ -325,7 +328,7 @@ class PipelineParallel(MetaParallelBase): ...@@ -325,7 +328,7 @@ class PipelineParallel(MetaParallelBase):
def _optimizer_step(self): def _optimizer_step(self):
if self.scaler: if self.scaler:
self.scaler.minimize(self.optimizer, self.train_loss) self.scaler.step(self.optimizer)
else: else:
self.optimizer.step() self.optimizer.step()
......
...@@ -29,7 +29,11 @@ class TestMPClipGrad(TestDistMPTraning): ...@@ -29,7 +29,11 @@ class TestMPClipGrad(TestDistMPTraning):
learning_rate=0.001, gamma=0.999, verbose=True) learning_rate=0.001, gamma=0.999, verbose=True)
optimizer = paddle.optimizer.SGD(scheduler, optimizer = paddle.optimizer.SGD(scheduler,
grad_clip=grad_clip, grad_clip=grad_clip,
parameters=model.parameters()) parameters=[{
'params': model.parameters(),
'weight_decay': 0.001,
'learning_rate': 0.1
}])
return optimizer return optimizer
def train_batch(self, batch, model, optimizer, is_mp): def train_batch(self, batch, model, optimizer, is_mp):
...@@ -43,7 +47,7 @@ class TestMPClipGrad(TestDistMPTraning): ...@@ -43,7 +47,7 @@ class TestMPClipGrad(TestDistMPTraning):
scaled = scaler.scale(loss) # scale the loss scaled = scaler.scale(loss) # scale the loss
scaled.backward() # do backward scaled.backward() # do backward
scaler.minimize(optimizer, scaled) # update parameters scaler.step(optimizer) # update parameters
optimizer.clear_grad() optimizer.clear_grad()
return scaled return scaled
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册