未验证 提交 f74ebd8a 编写于 作者: H Haohongxiang 提交者: GitHub

optimize performance of offload in dygraph sharding stage2 (#38064)

* update

* fix bugs

* modify code style

* fix bugs of _get_global_group
上级 61ef56a1
......@@ -127,8 +127,8 @@ def _get_group_map():
global _group_map
if not _group_map:
genv = _get_global_env()
_group_map[0] = Group(genv.rank, genv.world_size,
list(range(genv.world_size)))
_group_map[0] = Group(
genv.rank, genv.world_size, ranks=list(range(genv.world_size)))
return _group_map
......
......@@ -30,11 +30,11 @@ from paddle.optimizer import Optimizer
from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.distributed.collective import _get_global_group
from ...utils.internal_storage import ParamStorage
from ...utils.internal_storage import ParamStorage, GradStorage
from ...meta_parallel.sharding.sharding_utils import Type, device_guard, ShardingClipGrad
# CUDA alignment 256 bytes
alignment = {"gpu": 256, }
# CUDA alignment 256 bytes, cpu alignment 4096 bytes
alignment = {"gpu": 256, "cpu": 4096}
align = {
Type.fp16.value: 2,
Type.fp32.value: 4,
......@@ -95,10 +95,11 @@ class ShardingOptimizerStage2(Optimizer):
filter(lambda x: x.trainable and x.dtype == Type.fp16.value,
self._local_params))) > 0
self.group = group
group = _get_global_group() if group is None else group
self.world_size = group.nranks
self.rank = group.rank
self.group = dist.new_group(_get_global_group()
.ranks) if group is None else group
self.world_size = self.group.nranks
self.rank = self.group.rank
self.broadcast_fp16 = broadcast_fp16
self.param_storages = {} # {dtype: {rank: InternalStorage}}
......@@ -108,14 +109,18 @@ class ShardingOptimizerStage2(Optimizer):
"While using ClipGradByGlobalNorm in ShardingOptimizer, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = ShardingClipGrad(self._optim._grad_clip,
group,
paddle.get_device())
paddle.get_device(),
self.group)
if offload:
assert self._pfp16, "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16"
self.offload = offload # Using for offload
self.offload_device = "cpu"
self.offload_buffer_size = 0
self.offload_param2align = {}
self.offload_params = None
self.offload_grads = None
self._master_params = {}
......@@ -131,7 +136,6 @@ class ShardingOptimizerStage2(Optimizer):
value=param.cast(dtype=Type.fp32.value).numpy(),
place=core.CPUPlace(),
stop_gradient=param.stop_gradient)
self._optim._master_weights = self._master_params
else:
for param in trainable_params:
if param.dtype == Type.fp16.value:
......@@ -265,13 +269,73 @@ class ShardingOptimizerStage2(Optimizer):
for d in dtype_to_pop:
self.param_storages.pop(d)
if self.offload:
self._optim._master_weights = self._master_params
cpu_master_params = [p for p in self._master_params.values()]
for param in cpu_master_params:
size = np.prod(param.shape) * align[Type.fp32.value]
remaining = size % alignment[self.offload_device]
ali = 0 if remaining == 0 else alignment[
self.offload_device] - remaining
align_ = ali // align[Type.fp32.value]
self.offload_buffer_size += np.prod(param.shape) + align_
self.offload_param2align[param.name] = align_
if cpu_master_params:
with device_guard(self.rank, self.offload_device):
self.offload_params = ParamStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device)
self.offload_params.add_rank_params(
cpu_master_params, self.offload_param2align, False)
self.offload_params.buffer.stop_gradient = False
self.offload_grads = GradStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device,
destination=self.rank,
parm2align=self.offload_param2align,
convert_cpu=True)
for p in cpu_master_params:
self.offload_grads.add_grad(
p, self.offload_param2align[p.name])
self._optim._master_weights[
self.offload_params.buffer.
name] = self.offload_params.buffer
def _offload_acc_grad(self, param_name, grad_fp32_cpu):
"""accumulate grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
if param_name in self._master_params.keys():
if self._master_params[param_name].grad is None:
self._master_params[param_name]._copy_gradient_from(
grad_fp32_cpu)
else:
self._master_params[param_name].grad.add_(grad_fp32_cpu)
self.offload_params.buffer._copy_gradient_from(
self.offload_grads.buffer)
def _offload_scale_grad(self, scale_size):
"""scale grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
self.offload_grads.buffer.scale_(scale=scale_size)
def _offload_clear_grad(self):
"""clear grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
self.offload_grads.buffer.zero_()
def step(self):
"""
A wrapper for Optimizer's step function to finish the update operation of the optimizer.
"""
if self.offload:
params_list = list(self._master_params.values())
params_list = [self.offload_params.buffer]
else:
# Synchronize optimizer parameters for the current rank
params_list = []
......@@ -296,14 +360,11 @@ class ShardingOptimizerStage2(Optimizer):
with device_guard(device=self.offload_device):
self._optim.step()
dev_id = 0 if paddle.get_device() == "cpu" else int(
paddle.get_device().split(":")[1])
dev_id = int(paddle.get_device().split(":")[1])
for param in self._local_params:
if param.name in self._master_params.keys():
param.set_value(self._master_params[param.name].cuda(dev_id)
.cast(dtype=param.dtype))
self._master_params[param.name].clear_gradient(False)
else:
self._optim.step()
......
......@@ -51,7 +51,7 @@ class ShardingStage2(nn.Layer):
# Feature Notes::
# 1. Unified memory for param and param.grad to InternalStorage.
# 2. Divide param.grad according to rank to centrally apply for and release GPU memory.
# 3. Dynamically adjust training parameters and models
# 3. Dynamically adjust training parameters and models.
# 4. Support offload function.
# 5. Support the establishment of independent communication groups.
......@@ -85,11 +85,11 @@ class ShardingStage2(nn.Layer):
self._accumulate_grads = accumulate_grads
# Communication related attributes
self._group = group
group = _get_global_group() if group is None else group
self._world_size_scaling = 1.0 / group.nranks
assert group.nranks > 1, "Training must be distributed, ranks must be greater than 1"
self._rank = group.rank
self._group = dist.new_group(_get_global_group()
.ranks) if group is None else group
self._world_size_scaling = 1.0 / self._group.nranks
assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1"
self._rank = self._group.rank
self._global_root_rank = 0 # picking rank 0 as the reference
self._default_device = device
......@@ -173,37 +173,40 @@ class ShardingStage2(nn.Layer):
"""
# Release grad storages
for dtype in self._grad_storages.keys():
if self._rank in self._grad_storages[dtype].keys():
if not self._offload:
self._grad_storages[dtype][self._rank].buffer.zero_()
if not self._offload and self._rank in self._grad_storages[
dtype].keys():
self._grad_storages[dtype][self._rank].buffer.zero_()
# Release params
# Release grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.clear_gradient()
# Release grads of master params with offload strategy
if self._offload:
self._sharding_optimizers[0]._offload_clear_grad()
def _grad_scale(self):
"""
Before the gradient accumulation, scale the gradient.
"""
# Scale grad storages
for dtype in self._grad_storages.keys():
if not self._offload and self._rank in self._grad_storages[
dtype].keys():
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling)
# Scale grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)
# Scale grads of master params with offload strategy
if self._offload:
for param in self._trainable_params:
if param.name in self._sharding_optimizers[
0]._master_params.keys():
self._sharding_optimizers[0]._master_params[
param.name].grad.scale_(scale=self._world_size_scaling)
else:
# Scale grad storages
for dtype in self._grad_storages.keys():
if self._rank in self._grad_storages[dtype].keys():
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling)
# Scale params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)
self._sharding_optimizers[0]._offload_scale_grad(
self._world_size_scaling)
def _init_internal_storage(self, needs_fresh):
"""
......@@ -319,9 +322,9 @@ class ShardingStage2(nn.Layer):
if dst_rank != self._rank:
param.clear_gradient(False)
elif self._offload:
self._sharding_optimizers[0]._master_params[
param.name]._copy_gradient_from(param.grad.cpu(
).cast(dtype=Type.fp32.value))
self._sharding_optimizers[0]._offload_acc_grad(
param.name,
param.grad.cast(dtype=Type.fp32.value).cpu())
param.clear_gradient(False)
# Synchronize the reduce parameter gradient
......@@ -375,11 +378,14 @@ class ShardingStage2(nn.Layer):
)
elif self._offload:
grad_storage.to(device=self._offload_device)
for param in grad_storage._params:
self._sharding_optimizers[0]._master_params[
param.name]._copy_gradient_from(
param.grad.cast(
dtype=Type.fp32.value))
for p in grad_storage._params:
self._sharding_optimizers[
0]._offload_acc_grad(
p.name,
p.grad.cast(dtype=Type.fp32.value))
p.clear_gradient(False)
p._gradient_set_empty(False)
grad_storage._device = self._default_device
grad_storage.buffer.value().get_tensor()._clear(
)
......
......@@ -28,6 +28,7 @@ from paddle.fluid import layers
from paddle.fluid.dygraph import to_variable
from paddle.fluid.framework import dygraph_only
from paddle.fluid.dygraph import base as imperative_base
from paddle.distributed.collective import _get_global_group
class Taskflow:
......@@ -49,10 +50,10 @@ class Type(Enum):
class ShardingClipGrad:
def __init__(self, clip, group, device):
def __init__(self, clip, device, group):
self._clip = clip
self._group = group
self._device = device
self._group = group
@imperative_base.no_grad
def _dygraph_clip(self, params_grads):
......@@ -144,7 +145,7 @@ def device_guard(dev_id=0, device="cpu"):
@dygraph_only
def ShardingScaler(scaler, sharding_group):
def ShardingScaler(scaler):
def unscale_method(self, optimizer):
if not self._enable:
return
......@@ -152,10 +153,10 @@ def ShardingScaler(scaler, sharding_group):
param_grads_fp16 = []
param_grads_fp32 = []
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
if getattr(optimizer._optim, '_param_groups', None) and isinstance(
optimizer._optim._param_groups[0], dict):
for group in optimizer._param_groups:
for group in optimizer._optim._param_groups:
for param in group['params']:
if param._grad_ivar() is not None:
param_grads.append(param._grad_ivar())
......@@ -166,31 +167,37 @@ def ShardingScaler(scaler, sharding_group):
param_grads_fp32.append(param._grad_ivar())
else:
param_grads = [
param._grad_ivar() for param in optimizer._parameter_list
param._grad_ivar() for param in optimizer._optim._parameter_list
if param._grad_ivar() is not None
]
param_grads_fp16 = [
param._grad_ivar() for param in optimizer._parameter_list
param._grad_ivar() for param in optimizer._optim._parameter_list
if (param._grad_ivar() is not None
) and (param._grad_ivar().dtype == core.VarDesc.VarType.FP16
)
]
param_grads_fp32 = [
param._grad_ivar() for param in optimizer._parameter_list
param._grad_ivar() for param in optimizer._optim._parameter_list
if (param._grad_ivar() is not None
) and (param._grad_ivar().dtype == core.VarDesc.VarType.FP32
)
]
temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool))
temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool))
if len(param_grads_fp16):
_C_ops.check_finite_and_unscale(param_grads_fp16, self._scale,
param_grads_fp16,
temp_found_inf_fp16)
if len(param_grads_fp32):
_C_ops.check_finite_and_unscale(param_grads_fp32, self._scale,
param_grads_fp32,
temp_found_inf_fp32)
device = "cpu" if optimizer.offload else "gpu"
dev_id = 0 if device == "cpu" else int(paddle.get_device().split(":")[
1])
with device_guard(dev_id, device):
if len(param_grads_fp16):
_C_ops.check_finite_and_unscale(param_grads_fp16, self._scale,
param_grads_fp16,
temp_found_inf_fp16)
if len(param_grads_fp32):
_C_ops.check_finite_and_unscale(param_grads_fp32, self._scale,
param_grads_fp32,
temp_found_inf_fp32)
self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0
is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32")
......@@ -198,7 +205,7 @@ def ShardingScaler(scaler, sharding_group):
paddle.distributed.all_reduce(
is_found_inf,
op=paddle.distributed.ReduceOp.MAX,
group=sharding_group)
group=optimizer.group)
self._found_inf = is_found_inf.numpy()[0]
scaler._unscale = MethodType(unscale_method, scaler)
......
......@@ -69,9 +69,11 @@ class InternalStorage:
param._gradient_set_empty(False)
self.buffer.value().get_tensor()._clear()
self.buffer = tmp_buffer
self._device = device
if dtype is not None:
self.buffer = self.buffer.cast(dtype=dtype)
self._dtype = dtype
class ParamStorage(InternalStorage):
......@@ -94,7 +96,7 @@ class ParamStorage(InternalStorage):
self._array_params()
@fluid.dygraph.no_grad
def add_rank_params(self, trainable_params, param2align):
def add_rank_params(self, trainable_params, param2align, convert_gpu=True):
"""
Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer.
"""
......@@ -108,12 +110,15 @@ class ParamStorage(InternalStorage):
cpu_param_shape = list()
for param in trainable_params:
p_shape = self._add_param_as_view(param, param2align[param.name])
p_shape = self._add_param_as_view(param, param2align[param.name],
convert_gpu)
cpu_param_shape.append(p_shape)
# buffer convert from cpu to cuda
dev_id = int(paddle.get_device().split(":")[1])
self.buffer = self.buffer.cuda(dev_id)
if convert_gpu:
# buffer convert from cpu to cuda
dev_id = int(paddle.get_device().split(":")[1])
self.buffer = self.buffer.cuda(dev_id)
self._fill = 0
for idx, param in enumerate(trainable_params):
......@@ -123,7 +128,7 @@ class ParamStorage(InternalStorage):
self._param_ids.append(id(param))
@fluid.dygraph.no_grad
def _add_param_as_view(self, param, align):
def _add_param_as_view(self, param, align, convert_gpu=True):
assert (
param.dtype == self.buffer.dtype
......@@ -147,9 +152,12 @@ class ParamStorage(InternalStorage):
with device_guard(dev_id, "cpu"):
tmp_var = core.VarBase(tensor=self.buffer._slice(self._fill,
var_end))
param_cpu = param.cpu()
param.value().get_tensor()._clear()
tmp_var.set_value(param_cpu)
if convert_gpu:
param_cpu = param.cpu()
param.value().get_tensor()._clear()
tmp_var.set_value(param_cpu)
else:
tmp_var.set_value(param)
self._fill = offset
return p_shape
......@@ -186,10 +194,16 @@ class GradStorage(InternalStorage):
This is a basic class to simplify the handling of gradient InternalStorages
"""
def __init__(self, size, dtype, device, destination, parm2align):
def __init__(self,
size,
dtype,
device,
destination,
parm2align,
convert_cpu=False):
if isinstance(size, np.int64):
size = size.tolist()
super().__init__(size, dtype, device)
super().__init__(size, dtype, device, convert_cpu)
self._max_size = size
self._release = False
......
......@@ -40,24 +40,16 @@ paddle.seed(seed)
def train_mlp(model, offload=False):
group = paddle.distributed.new_group([0, 1])
optimizer = optimizer_setting(model=model, use_pure_fp16=True)
model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = ShardingScaler(scaler, group)
scaler = ShardingScaler(scaler)
optimizer = ShardingOptimizerStage2(
params=model.parameters(),
optim=optimizer,
group=group,
offload=offload)
params=model.parameters(), optim=optimizer, offload=offload)
model = ShardingStage2(
model,
optimizer,
group=group,
buffer_max_size=2**21,
accumulate_grads=True)
model, optimizer, buffer_max_size=2**21, accumulate_grads=True)
train_reader = paddle.batch(
reader_decorator(linear_size), batch_size=batch_size, drop_last=True)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册