diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index b9e5789581ae53836f410069961a646813f4b017..14a411ae253566b3f6f12a2082900e070977c5e9 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -127,8 +127,8 @@ def _get_group_map(): global _group_map if not _group_map: genv = _get_global_env() - _group_map[0] = Group(genv.rank, genv.world_size, - list(range(genv.world_size))) + _group_map[0] = Group( + genv.rank, genv.world_size, ranks=list(range(genv.world_size))) return _group_map diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py index 663b2293b4515491d70ca442e0d68ec0ca306db8..4ec43aa1e05ed448f50096ae840ff03775be4013 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py @@ -30,11 +30,11 @@ from paddle.optimizer import Optimizer from paddle.fluid.clip import ClipGradByGlobalNorm from paddle.distributed.collective import _get_global_group -from ...utils.internal_storage import ParamStorage +from ...utils.internal_storage import ParamStorage, GradStorage from ...meta_parallel.sharding.sharding_utils import Type, device_guard, ShardingClipGrad -# CUDA alignment 256 bytes -alignment = {"gpu": 256, } +# CUDA alignment 256 bytes, cpu alignment 4096 bytes +alignment = {"gpu": 256, "cpu": 4096} align = { Type.fp16.value: 2, Type.fp32.value: 4, @@ -95,10 +95,11 @@ class ShardingOptimizerStage2(Optimizer): filter(lambda x: x.trainable and x.dtype == Type.fp16.value, self._local_params))) > 0 - self.group = group - group = _get_global_group() if group is None else group - self.world_size = group.nranks - self.rank = group.rank + self.group = dist.new_group(_get_global_group() + .ranks) if group is None else group + + self.world_size = self.group.nranks + self.rank = self.group.rank self.broadcast_fp16 = broadcast_fp16 self.param_storages = {} # {dtype: {rank: InternalStorage}} @@ -108,14 +109,18 @@ class ShardingOptimizerStage2(Optimizer): "While using ClipGradByGlobalNorm in ShardingOptimizer, the grad clip of original optimizer will be changed." ) self._optim._grad_clip = ShardingClipGrad(self._optim._grad_clip, - group, - paddle.get_device()) + paddle.get_device(), + self.group) if offload: assert self._pfp16, "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16" self.offload = offload # Using for offload self.offload_device = "cpu" + self.offload_buffer_size = 0 + self.offload_param2align = {} + self.offload_params = None + self.offload_grads = None self._master_params = {} @@ -131,7 +136,6 @@ class ShardingOptimizerStage2(Optimizer): value=param.cast(dtype=Type.fp32.value).numpy(), place=core.CPUPlace(), stop_gradient=param.stop_gradient) - self._optim._master_weights = self._master_params else: for param in trainable_params: if param.dtype == Type.fp16.value: @@ -265,13 +269,73 @@ class ShardingOptimizerStage2(Optimizer): for d in dtype_to_pop: self.param_storages.pop(d) + if self.offload: + self._optim._master_weights = self._master_params + cpu_master_params = [p for p in self._master_params.values()] + for param in cpu_master_params: + size = np.prod(param.shape) * align[Type.fp32.value] + remaining = size % alignment[self.offload_device] + ali = 0 if remaining == 0 else alignment[ + self.offload_device] - remaining + align_ = ali // align[Type.fp32.value] + self.offload_buffer_size += np.prod(param.shape) + align_ + self.offload_param2align[param.name] = align_ + + if cpu_master_params: + with device_guard(self.rank, self.offload_device): + self.offload_params = ParamStorage( + size=self.offload_buffer_size, + dtype=Type.fp32.value, + device=self.offload_device) + self.offload_params.add_rank_params( + cpu_master_params, self.offload_param2align, False) + self.offload_params.buffer.stop_gradient = False + + self.offload_grads = GradStorage( + size=self.offload_buffer_size, + dtype=Type.fp32.value, + device=self.offload_device, + destination=self.rank, + parm2align=self.offload_param2align, + convert_cpu=True) + for p in cpu_master_params: + self.offload_grads.add_grad( + p, self.offload_param2align[p.name]) + + self._optim._master_weights[ + self.offload_params.buffer. + name] = self.offload_params.buffer + + def _offload_acc_grad(self, param_name, grad_fp32_cpu): + """accumulate grads with offload strategy""" + with device_guard(self.rank, self.offload_device): + if param_name in self._master_params.keys(): + if self._master_params[param_name].grad is None: + self._master_params[param_name]._copy_gradient_from( + grad_fp32_cpu) + else: + self._master_params[param_name].grad.add_(grad_fp32_cpu) + + self.offload_params.buffer._copy_gradient_from( + self.offload_grads.buffer) + + def _offload_scale_grad(self, scale_size): + """scale grads with offload strategy""" + with device_guard(self.rank, self.offload_device): + self.offload_grads.buffer.scale_(scale=scale_size) + + def _offload_clear_grad(self): + """clear grads with offload strategy""" + with device_guard(self.rank, self.offload_device): + self.offload_grads.buffer.zero_() + def step(self): """ A wrapper for Optimizer's step function to finish the update operation of the optimizer. """ if self.offload: - params_list = list(self._master_params.values()) + params_list = [self.offload_params.buffer] else: # Synchronize optimizer parameters for the current rank params_list = [] @@ -296,14 +360,11 @@ class ShardingOptimizerStage2(Optimizer): with device_guard(device=self.offload_device): self._optim.step() - dev_id = 0 if paddle.get_device() == "cpu" else int( - paddle.get_device().split(":")[1]) - + dev_id = int(paddle.get_device().split(":")[1]) for param in self._local_params: if param.name in self._master_params.keys(): param.set_value(self._master_params[param.name].cuda(dev_id) .cast(dtype=param.dtype)) - self._master_params[param.name].clear_gradient(False) else: self._optim.step() diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py index 1a381385a893515a46968d588bca8acfc2f349ed..d884c416fa92c270c9c5c0961a7fb1214f93000e 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py @@ -51,7 +51,7 @@ class ShardingStage2(nn.Layer): # Feature Notes:: # 1. Unified memory for param and param.grad to InternalStorage. # 2. Divide param.grad according to rank to centrally apply for and release GPU memory. - # 3. Dynamically adjust training parameters and models。 + # 3. Dynamically adjust training parameters and models. # 4. Support offload function. # 5. Support the establishment of independent communication groups. @@ -85,11 +85,11 @@ class ShardingStage2(nn.Layer): self._accumulate_grads = accumulate_grads # Communication related attributes - self._group = group - group = _get_global_group() if group is None else group - self._world_size_scaling = 1.0 / group.nranks - assert group.nranks > 1, "Training must be distributed, ranks must be greater than 1" - self._rank = group.rank + self._group = dist.new_group(_get_global_group() + .ranks) if group is None else group + self._world_size_scaling = 1.0 / self._group.nranks + assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1" + self._rank = self._group.rank self._global_root_rank = 0 # picking rank 0 as the reference self._default_device = device @@ -173,37 +173,40 @@ class ShardingStage2(nn.Layer): """ # Release grad storages for dtype in self._grad_storages.keys(): - if self._rank in self._grad_storages[dtype].keys(): - if not self._offload: - self._grad_storages[dtype][self._rank].buffer.zero_() + if not self._offload and self._rank in self._grad_storages[ + dtype].keys(): + self._grad_storages[dtype][self._rank].buffer.zero_() - # Release params + # Release grads of params for param in self._trainable_params: if param.name in self._param_grads and param.grad is not None: param.clear_gradient() + # Release grads of master params with offload strategy + if self._offload: + self._sharding_optimizers[0]._offload_clear_grad() + def _grad_scale(self): """ Before the gradient accumulation, scale the gradient. """ + # Scale grad storages + for dtype in self._grad_storages.keys(): + if not self._offload and self._rank in self._grad_storages[ + dtype].keys(): + self._grad_storages[dtype][self._rank].buffer.scale_( + scale=self._world_size_scaling) + + # Scale grads of params + for param in self._trainable_params: + if param.name in self._param_grads and param.grad is not None: + param.grad.scale_(scale=self._world_size_scaling) + param._reset_grad_inplace_version(True) + + # Scale grads of master params with offload strategy if self._offload: - for param in self._trainable_params: - if param.name in self._sharding_optimizers[ - 0]._master_params.keys(): - self._sharding_optimizers[0]._master_params[ - param.name].grad.scale_(scale=self._world_size_scaling) - else: - # Scale grad storages - for dtype in self._grad_storages.keys(): - if self._rank in self._grad_storages[dtype].keys(): - self._grad_storages[dtype][self._rank].buffer.scale_( - scale=self._world_size_scaling) - - # Scale params - for param in self._trainable_params: - if param.name in self._param_grads and param.grad is not None: - param.grad.scale_(scale=self._world_size_scaling) - param._reset_grad_inplace_version(True) + self._sharding_optimizers[0]._offload_scale_grad( + self._world_size_scaling) def _init_internal_storage(self, needs_fresh): """ @@ -319,9 +322,9 @@ class ShardingStage2(nn.Layer): if dst_rank != self._rank: param.clear_gradient(False) elif self._offload: - self._sharding_optimizers[0]._master_params[ - param.name]._copy_gradient_from(param.grad.cpu( - ).cast(dtype=Type.fp32.value)) + self._sharding_optimizers[0]._offload_acc_grad( + param.name, + param.grad.cast(dtype=Type.fp32.value).cpu()) param.clear_gradient(False) # Synchronize the reduce parameter gradient @@ -375,11 +378,14 @@ class ShardingStage2(nn.Layer): ) elif self._offload: grad_storage.to(device=self._offload_device) - for param in grad_storage._params: - self._sharding_optimizers[0]._master_params[ - param.name]._copy_gradient_from( - param.grad.cast( - dtype=Type.fp32.value)) + for p in grad_storage._params: + self._sharding_optimizers[ + 0]._offload_acc_grad( + p.name, + p.grad.cast(dtype=Type.fp32.value)) + p.clear_gradient(False) + p._gradient_set_empty(False) + grad_storage._device = self._default_device grad_storage.buffer.value().get_tensor()._clear( ) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py index b080035a116cfd561acd9e6924843ea95a721c31..272aada576be8a5182187fb6fe2e80bc6ac757bb 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py @@ -28,6 +28,7 @@ from paddle.fluid import layers from paddle.fluid.dygraph import to_variable from paddle.fluid.framework import dygraph_only from paddle.fluid.dygraph import base as imperative_base +from paddle.distributed.collective import _get_global_group class Taskflow: @@ -49,10 +50,10 @@ class Type(Enum): class ShardingClipGrad: - def __init__(self, clip, group, device): + def __init__(self, clip, device, group): self._clip = clip - self._group = group self._device = device + self._group = group @imperative_base.no_grad def _dygraph_clip(self, params_grads): @@ -144,7 +145,7 @@ def device_guard(dev_id=0, device="cpu"): @dygraph_only -def ShardingScaler(scaler, sharding_group): +def ShardingScaler(scaler): def unscale_method(self, optimizer): if not self._enable: return @@ -152,10 +153,10 @@ def ShardingScaler(scaler, sharding_group): param_grads_fp16 = [] param_grads_fp32 = [] - if getattr(optimizer, '_param_groups', None) and isinstance( - optimizer._param_groups[0], dict): + if getattr(optimizer._optim, '_param_groups', None) and isinstance( + optimizer._optim._param_groups[0], dict): - for group in optimizer._param_groups: + for group in optimizer._optim._param_groups: for param in group['params']: if param._grad_ivar() is not None: param_grads.append(param._grad_ivar()) @@ -166,31 +167,37 @@ def ShardingScaler(scaler, sharding_group): param_grads_fp32.append(param._grad_ivar()) else: param_grads = [ - param._grad_ivar() for param in optimizer._parameter_list + param._grad_ivar() for param in optimizer._optim._parameter_list if param._grad_ivar() is not None ] param_grads_fp16 = [ - param._grad_ivar() for param in optimizer._parameter_list + param._grad_ivar() for param in optimizer._optim._parameter_list if (param._grad_ivar() is not None ) and (param._grad_ivar().dtype == core.VarDesc.VarType.FP16 ) ] param_grads_fp32 = [ - param._grad_ivar() for param in optimizer._parameter_list + param._grad_ivar() for param in optimizer._optim._parameter_list if (param._grad_ivar() is not None ) and (param._grad_ivar().dtype == core.VarDesc.VarType.FP32 ) ] temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool)) temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool)) - if len(param_grads_fp16): - _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale, - param_grads_fp16, - temp_found_inf_fp16) - if len(param_grads_fp32): - _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale, - param_grads_fp32, - temp_found_inf_fp32) + + device = "cpu" if optimizer.offload else "gpu" + dev_id = 0 if device == "cpu" else int(paddle.get_device().split(":")[ + 1]) + + with device_guard(dev_id, device): + if len(param_grads_fp16): + _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale, + param_grads_fp16, + temp_found_inf_fp16) + if len(param_grads_fp32): + _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale, + param_grads_fp32, + temp_found_inf_fp32) self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0 is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32") @@ -198,7 +205,7 @@ def ShardingScaler(scaler, sharding_group): paddle.distributed.all_reduce( is_found_inf, op=paddle.distributed.ReduceOp.MAX, - group=sharding_group) + group=optimizer.group) self._found_inf = is_found_inf.numpy()[0] scaler._unscale = MethodType(unscale_method, scaler) diff --git a/python/paddle/distributed/fleet/utils/internal_storage.py b/python/paddle/distributed/fleet/utils/internal_storage.py index f44b57ede468b27379e194087c02d61c2a731aef..469da22366748d7eca6520d9c8cf931d41d5c79c 100644 --- a/python/paddle/distributed/fleet/utils/internal_storage.py +++ b/python/paddle/distributed/fleet/utils/internal_storage.py @@ -69,9 +69,11 @@ class InternalStorage: param._gradient_set_empty(False) self.buffer.value().get_tensor()._clear() self.buffer = tmp_buffer + self._device = device if dtype is not None: self.buffer = self.buffer.cast(dtype=dtype) + self._dtype = dtype class ParamStorage(InternalStorage): @@ -94,7 +96,7 @@ class ParamStorage(InternalStorage): self._array_params() @fluid.dygraph.no_grad - def add_rank_params(self, trainable_params, param2align): + def add_rank_params(self, trainable_params, param2align, convert_gpu=True): """ Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer. """ @@ -108,12 +110,15 @@ class ParamStorage(InternalStorage): cpu_param_shape = list() for param in trainable_params: - p_shape = self._add_param_as_view(param, param2align[param.name]) + p_shape = self._add_param_as_view(param, param2align[param.name], + convert_gpu) cpu_param_shape.append(p_shape) - # buffer convert from cpu to cuda - dev_id = int(paddle.get_device().split(":")[1]) - self.buffer = self.buffer.cuda(dev_id) + if convert_gpu: + # buffer convert from cpu to cuda + dev_id = int(paddle.get_device().split(":")[1]) + self.buffer = self.buffer.cuda(dev_id) + self._fill = 0 for idx, param in enumerate(trainable_params): @@ -123,7 +128,7 @@ class ParamStorage(InternalStorage): self._param_ids.append(id(param)) @fluid.dygraph.no_grad - def _add_param_as_view(self, param, align): + def _add_param_as_view(self, param, align, convert_gpu=True): assert ( param.dtype == self.buffer.dtype @@ -147,9 +152,12 @@ class ParamStorage(InternalStorage): with device_guard(dev_id, "cpu"): tmp_var = core.VarBase(tensor=self.buffer._slice(self._fill, var_end)) - param_cpu = param.cpu() - param.value().get_tensor()._clear() - tmp_var.set_value(param_cpu) + if convert_gpu: + param_cpu = param.cpu() + param.value().get_tensor()._clear() + tmp_var.set_value(param_cpu) + else: + tmp_var.set_value(param) self._fill = offset return p_shape @@ -186,10 +194,16 @@ class GradStorage(InternalStorage): This is a basic class to simplify the handling of gradient InternalStorages """ - def __init__(self, size, dtype, device, destination, parm2align): + def __init__(self, + size, + dtype, + device, + destination, + parm2align, + convert_cpu=False): if isinstance(size, np.int64): size = size.tolist() - super().__init__(size, dtype, device) + super().__init__(size, dtype, device, convert_cpu) self._max_size = size self._release = False diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py index 37537019c0aece53b3070a350eee47cff6ce07b2..f7e426377382bb089d9a4c4f968759f38c40e647 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py @@ -40,24 +40,16 @@ paddle.seed(seed) def train_mlp(model, offload=False): - group = paddle.distributed.new_group([0, 1]) optimizer = optimizer_setting(model=model, use_pure_fp16=True) model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32') scaler = paddle.amp.GradScaler(init_loss_scaling=32768) - scaler = ShardingScaler(scaler, group) + scaler = ShardingScaler(scaler) optimizer = ShardingOptimizerStage2( - params=model.parameters(), - optim=optimizer, - group=group, - offload=offload) + params=model.parameters(), optim=optimizer, offload=offload) model = ShardingStage2( - model, - optimizer, - group=group, - buffer_max_size=2**21, - accumulate_grads=True) + model, optimizer, buffer_max_size=2**21, accumulate_grads=True) train_reader = paddle.batch( reader_decorator(linear_size), batch_size=batch_size, drop_last=True)