未验证 提交 f4cc5def 编写于 作者: B Baibaifan 提交者: GitHub

sharding_for_eager_tensor (#41415)

上级 c4d5a77f
...@@ -473,7 +473,7 @@ static PyObject* tensor__share_buffer_to(TensorObject* self, PyObject* args, ...@@ -473,7 +473,7 @@ static PyObject* tensor__share_buffer_to(TensorObject* self, PyObject* args,
} }
auto dst_tensor = auto dst_tensor =
static_cast<paddle::framework::Tensor*>(dst_ptr->impl().get()); static_cast<paddle::framework::Tensor*>(dst_ptr->impl().get());
dst_tensor->ShareDataWith(*src_tensor); dst_tensor->ShareBufferWith(*src_tensor);
dst_tensor->ShareDataTypeWith(*src_tensor); dst_tensor->ShareDataTypeWith(*src_tensor);
Py_INCREF(Py_None); Py_INCREF(Py_None);
return Py_None; return Py_None;
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#Taken and modified for fairscale from:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py
#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
import copy
import logging
import numpy as np
from collections import OrderedDict
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.optimizer import Optimizer
from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.distributed.collective import _get_global_group, new_group, broadcast, wait
from .group_sharded_storage import ParamStorage, GradStorage
from .group_sharded_utils import Type, device_guard, GroupShardedClipGrad
# CUDA alignment 256 bytes, cpu alignment 4096 bytes
alignment = {"gpu": 256, "cpu": 4096}
align = {
Type.fp16.value: 2,
Type.fp32.value: 4,
}
class GroupShardedOptimizerStage2(Optimizer):
"""
A wrapper for Sharding Stage2 Optimizer in Dygraph.
.. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer.
.. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes:
# 1. Unified memory for parameters and parameters.grad to InternalStorage.
# 2. Support the segmentation of optimizer parameters and partial updating of parameters.
# 3. Dynamically adjust training parameters and models.
# 4. Support offload function.
# 5. Support the establishment of independent communication groups.
# 6. Broadcast_fp16 is not supported now.
def __init__(self,
params,
optim,
group=None,
offload=False,
device="gpu",
pertrain_sync_models=True,
**kw):
super().__init__(learning_rate=optim._learning_rate, parameters=params)
assert core.is_compiled_with_cuda(), "Only GPU is supported now"
# Segmentation information
self._dtype_rank_params = OrderedDict(
) # {dtype:[param1,param2]} device, rank, params
self._param2rank = {}
self.__segment_params = []
self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}}
self._param2align = {} # {param.name: align}
# Default information
self._optim = optim
assert hasattr(self._optim, "_master_weights"
), "Must use optimizer with _master_weights attribute"
# Support parameter group and parameter list
self._local_params = []
if isinstance(params[0], dict):
for param_group in params:
self._local_params.extend(list(param_group["params"]))
else:
self._local_params.extend(list(params))
self._default_device = device
self._pfp16 = len(
list(
filter(lambda x: x.trainable and x.dtype == Type.fp16.value,
self._local_params))) > 0
self._group = new_group(_get_global_group()
.ranks) if group is None else group
self.world_size = self._group.nranks
self._rank = self._group.rank
self._global_root_rank = self._group.ranks[0]
# Synchronous all ranks models
if pertrain_sync_models:
self._sync_params_and_buffers()
self.param_storages = {} # {dtype: {rank: InternalStorage}}
if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm):
logging.warning(
"While using ClipGradByGlobalNorm in GroupShardedOptimizerStage2, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = GroupShardedClipGrad(
self._optim._grad_clip, paddle.get_device(), self._group)
if self._optim._parameter_list and isinstance(
self._optim._parameter_list[0], dict):
for item in self._optim._param_groups:
if "grad_clip" in item.keys():
item["grad_clip"] = self._optim._grad_clip
if offload:
assert self._pfp16, "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16"
self.offload = offload # Using for offload
self.offload_device = "cpu"
self.offload_buffer_size = 0
self.offload_param2align = {}
self.offload_params = None
self.offload_grads = None
self.dev_id = int(paddle.get_device().split(":")[1])
self._master_params = {}
# Update optimizer parameters and adjust parameter storage and use according to rank.
self._update_opt_status()
@paddle.autograd.no_grad()
def _sync_params_and_buffers(self):
"""
Sync all model states for all ranks
"""
for p in self._local_params:
broadcast(
p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
def _generate_master_params(self, trainable_params):
if self.offload:
for param in trainable_params:
if param.name not in self._master_params.keys():
self._master_params[param.name] = core.eager.Tensor(
name=param.name,
value=param.cast(dtype=Type.fp32.value).numpy(),
place=core.CPUPlace(),
stop_gradient=param.stop_gradient)
else:
for param in trainable_params:
if param.dtype == Type.fp16.value:
master_tensor = paddle.cast(param, Type.fp32.value)
master_tensor.name = param.name
self._optim._master_weights[param.name] = master_tensor
def _update_opt_status(self):
"""Update optimizer status and parameter storage information, and special functions to be developed.
"""
# func 1
self._integration_params()
# Segement helpers
def _segment_params(self):
"""
Divide all optimizer parameters equally into rank.
"""
if len(self.__segment_params) == 0:
self.__segment_params, param_lists = [
[] for _ in range(self.world_size)
], [[] for _ in range(self.world_size)]
sizes = [0] * self.world_size
for param in self._local_params:
# Add this param to rank with smallest size.
rank = sizes.index(min(sizes))
param_lists[rank].append(param)
# Statistical real numels
sizes[rank] += param._numel() if param.trainable else 0
for rank, params in enumerate(param_lists):
self.__segment_params[rank].extend(params)
return self.__segment_params
@property
def local_params(self):
return self._local_params
@property
def param2rank(self):
"""Map the params to the rank which owns them"""
if len(self._param2rank) == 0:
for rank, params in enumerate(self._segment_params()):
for param in params:
self._param2rank[param.name] = rank
return self._param2rank
@property
def dtype_rank_params(self):
"""
Divide the parameters into groups according to rank and dtype.
"""
if len(self._dtype_rank_params) == 0:
# Assign the parameters of each rank according to the type
for param in self._local_params:
if param.dtype not in self._dtype_rank_params.keys():
self._dtype_rank_params[
param.dtype] = [[] for _ in range(self.world_size)]
self._dtype_rank_params[param.dtype][self.param2rank[
param.name]].append(param)
# Sort per rank params by size
for dtype in self._dtype_rank_params.keys():
for rank_params in self._dtype_rank_params[dtype]:
rank_params.sort(key=lambda x: x._numel())
return self._dtype_rank_params
@property
def rank_buffer_size(self):
"""
Count the memory size of the parameters corresponding to rank under the corresponding dtype.
"""
# CUDA alignment 256 bytes
if len(self._rank_buffer_size) == 0:
for dtype in self.dtype_rank_params.keys():
if dtype not in self._rank_buffer_size.keys():
self._rank_buffer_size[dtype] = {}
for dst_rank, per_rank_params in enumerate(
self.dtype_rank_params[dtype]):
if dst_rank not in self._rank_buffer_size[dtype].keys():
self._rank_buffer_size[dtype][dst_rank] = 0
for param in per_rank_params:
if not param.trainable:
continue
size = param._numel() * align[dtype]
remaining = size % alignment[self._default_device]
ali = 0 if remaining == 0 else alignment[
self._default_device] - remaining
align_ = ali // align[dtype]
self._rank_buffer_size[dtype][dst_rank] += param._numel(
) + align_
self._param2align[param.name] = align_
return self._rank_buffer_size
def _integration_params(self):
"""
Integrate the parameters into a continuous memory according to rank, and support the update of training parameters.
"""
for dtype, per_rank_params in self.dtype_rank_params.items():
if dtype not in self.param_storages.keys():
self.param_storages[dtype] = {}
for dst_rank, params in enumerate(per_rank_params):
if len(params) > 0:
# Merge all the trainable params in a single InternalStorage
trainable_params = list(
filter(lambda x: x.trainable, params))
if self._pfp16 and dst_rank == self._rank:
self._generate_master_params(trainable_params)
if trainable_params:
param_storage = ParamStorage(
size=self.rank_buffer_size[dtype][dst_rank],
dtype=dtype,
device=self._default_device)
param_storage.add_rank_params(trainable_params,
self._param2align)
self.param_storages[dtype][dst_rank] = param_storage
# Clear the InternalStorage keys which are not in use anymore
dtype_in_use = list(self.dtype_rank_params.keys())
dtype_to_pop = list(
filter(lambda x: x not in dtype_in_use, self.param_storages.keys()))
for d in dtype_to_pop:
self.param_storages.pop(d)
if self.offload:
self._optim._master_weights = self._master_params
cpu_master_params = [p for p in self._master_params.values()]
for param in cpu_master_params:
size = param._numel() * align[Type.fp32.value]
remaining = size % alignment[self.offload_device]
ali = 0 if remaining == 0 else alignment[
self.offload_device] - remaining
align_ = ali // align[Type.fp32.value]
self.offload_buffer_size += param._numel() + align_
self.offload_param2align[param.name] = align_
if cpu_master_params:
with device_guard(self._rank, self.offload_device):
self.offload_params = ParamStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device)
self.offload_params.buffer.name = "offload_buffer"
self.offload_params.add_rank_params(
cpu_master_params, self.offload_param2align, False)
self.offload_params.buffer.stop_gradient = False
self.offload_grads = GradStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device,
destination=self._rank,
parm2align=self.offload_param2align,
convert_cpu=True)
for p in cpu_master_params:
self.offload_grads.add_grad(
p, self.offload_param2align[p.name])
self._optim._master_weights[
self.offload_params.buffer.
name] = self.offload_params.buffer
def _offload_acc_grad(self, param_name, grad_fp32_cpu):
"""accumulate grads with offload strategy"""
with device_guard(self._rank, self.offload_device):
if param_name in self._master_params.keys():
if self._master_params[param_name].grad is None:
self._master_params[param_name]._copy_gradient_from(
grad_fp32_cpu)
else:
self._master_params[param_name].grad.add_(grad_fp32_cpu)
self.offload_params.buffer._copy_gradient_from(
self.offload_grads.buffer)
def _offload_scale_grad(self, scale_size):
"""scale grads with offload strategy"""
with device_guard(self._rank, self.offload_device):
self.offload_grads.buffer.scale_(scale=scale_size)
def _offload_clear_grad(self):
"""clear grads with offload strategy"""
with device_guard(self._rank, self.offload_device):
self.offload_grads.buffer.zero_()
def step(self):
"""
A wrapper for Optimizer's step function to finish the update operation of the optimizer.
"""
if self.offload:
params_list = [self.offload_params.buffer]
#TODO(Baibaifan): Offload will support param_groups later
if not isinstance(self._optim._param_groups[0], dict):
self._optim._parameter_list = params_list
self._optim._param_groups = params_list
# Run the optimizer of the current rank step
if self.offload:
with device_guard(device=self.offload_device):
self._optim.step()
for param in self._local_params:
if param.name in self._master_params.keys():
param.set_value(self._master_params[param.name].cuda(
self.dev_id).cast(dtype=param.dtype))
else:
self._optim.step()
# Synchronize all the updated shards in between the ranks
self._broadcast_params()
def minimize(self):
raise RuntimeError(
"optimizer.minimize() not support now, please use optimizer.step()")
def set_state_dict(self, state_dict):
self._optim.set_state_dict(state_dict)
def state_dict(self):
return self._optim.state_dict()
def _clear_cache(self):
self.__segment_params.clear()
self._dtype_rank_params.clear()
self._param2rank.clear()
@paddle.autograd.no_grad()
def _broadcast_params(self):
"""Broadcast the parameters of the current rank to each rank"""
# Exchange all the shards with the other ranks
for dtype_per_rank in self.param_storages.values():
for dst_rank, internal_storage in dtype_per_rank.items():
broadcast(
tensor=internal_storage.buffer,
src=self._group.ranks[dst_rank],
group=self._group,
use_calc_stream=True)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#Taken and modified for fairscale from:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py
#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
import logging
import time
import functools
import numpy as np
from functools import reduce
from collections import deque
from types import MethodType
import paddle
from paddle import nn
from paddle.distributed import collective
from paddle.distributed.utils import get_logger
from .group_sharded_storage import GradStorage
from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from .group_sharded_utils import Taskflow, Type, device_guard
logger_ = get_logger(logging.INFO)
def _trainable(param):
return param.trainable
class GroupShardedStage2(nn.Layer):
"""
A wrapper for Sharding Stage2 Layer in Dygraph.
.. warning: GroupShardedStage2 encapsulates the layer strategy and integrates it into the nn.Layer.
.. ZeRO: https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes::
# 1. Unified memory for param and param.grad to InternalStorage.
# 2. Divide param.grad according to rank to centrally apply for and release GPU memory.
# 3. Dynamically adjust training parameters and models.
# 4. Support offload function.
# 5. Support the establishment of independent communication groups.
def __init__(
self,
layer,
sharding_optimizer,
group=None,
sync_buffers=False,
buffer_max_size=2**23, #8MB
auto_refresh_trainable=True,
device="gpu"):
super().__init__()
# training options
self._layer = layer
self._sharding_optimizers = [sharding_optimizer] if not isinstance(
sharding_optimizer, list) else sharding_optimizer
assert all(
list(
map(lambda opt: isinstance(opt, GroupShardedOptimizerStage2),
self._sharding_optimizers))
), "Please use GroupShardedOptimizerStage2 optimizer"
self._sync_buffers = sync_buffers
self._auto_refresh_trainable = auto_refresh_trainable
# Communication related attributes
self._group = collective.new_group(collective._get_global_group()
.ranks) if group is None else group
self._world_size_scaling = 1.0 / self._group.nranks
assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1"
self._rank = self._group.rank
self._global_root_rank = self._group.ranks[
0] # picking ranks index 0 as the reference
self._default_device = device
# Global statistical parameters
self._all_params = []
for optim in self._sharding_optimizers:
self._all_params.extend(list(optim.local_params))
self._trainable_params = []
self._grad_reduced = []
self._trainable_param2rank = {}
self._trainable_param2align = {}
self._trainable_mask = list(map(_trainable, self._all_params))
self._param_grads = []
# Set grad storage size & Display param sizes and model sizes
model_size = sum([p._numel() for p in self._layer.parameters()])
assert buffer_max_size >= 0, "buffer_max_size must be GE than 0."
self._buffer_max_size = self._rank_buffer_size(buffer_max_size,
model_size)
self._use_grad_storage = buffer_max_size > 0
self._grad_storages = {} # {dtype: {rank: GradStorage}}
self._has_grad_storage = []
self._grad_storage_list = []
# Offload
# TODO(haohongxiang): Now it's not be supported for multi-optimizers using Offload strategy
self._offload_optims = list(
filter(lambda optim: optim.offload, self._sharding_optimizers))
if len(self._offload_optims) > 0:
assert len(
self._sharding_optimizers
) == 1, "Only support offload strategy for single optimizer"
self._offload = len(self._offload_optims) > 0
self._offload_device = "cpu"
# Set backward pass hooks
self._bw_hooks = []
# TODO (Baibaifan) Set tasks flow support asynchronous communicate
# self._tasks_flow = deque()
# Define optimizer step and clear_grad
self._redefine_opt_step()
self._redefine_opt_clear()
def forward(self, *inputs, **kwargs):
"""
A wrapper for Sharding Stage2 layer.
- Fresh trainable params or rebuild grad storage
- Sync layer's buffer params
- Clear all flags states
- Forward for origin layers
"""
# Whether to need to reset trainable parameters
needs_fresh = len(self._bw_hooks) == 0 and self.training
if self._auto_refresh_trainable:
needs_fresh |= self._detect_train_change()
# Front hook
self._init_internal_storage(needs_fresh)
# Sync layer's buffers state
if self._sync_buffers:
self.__sync_buffers()
# Normal FW on the base model
fw = self._layer(*inputs, **kwargs)
return fw
def set_state_dict(self, state_dict, use_structured_name=True):
self._layer.set_state_dict(
state_dict, use_structured_name=use_structured_name)
def state_dict(self,
destination=None,
include_sublayers=True,
structured_name_prefix=""):
return self._layer.state_dict(
destination=destination,
include_sublayers=include_sublayers,
structured_name_prefix=structured_name_prefix)
def _clear_gradients(self):
"""
Set zero to the gradient of the optimizer's current rank trainable parameters.
"""
# Release grad storages
for dtype in self._grad_storages.keys():
if not self._offload and self._rank in self._grad_storages[
dtype].keys():
self._grad_storages[dtype][self._rank].buffer.zero_()
# Release grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param._zero_grads()
# Release grads of master params with offload strategy
if self._offload:
self._sharding_optimizers[0]._offload_clear_grad()
def _grad_scale(self):
"""
Before the gradient accumulation, scale the gradient.
"""
# Scale grad storages
for dtype in self._grad_storages.keys():
if not self._offload and self._rank in self._grad_storages[
dtype].keys():
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling)
# Scale grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
# param._reset_grad_inplace_version(True)
# Scale grads of master params with offload strategy
if self._offload:
self._sharding_optimizers[0]._offload_scale_grad(
self._world_size_scaling)
def _init_internal_storage(self, needs_fresh):
"""
Judge Fresh trainable params or rebuild grad storage.
"""
if needs_fresh:
self._fresh_trainable()
else:
self._build_grad_storages()
# Clear all flags state
self._clear_counters()
def to(self, device=None, dtype=None, blocking=True):
"""
Synchronously or asynchronously convert the data type of the layer, the device is not supported now.
"""
assert isinstance(device, str), "Device must be type str"
assert device == self._default_device, "New devices are not supported, because of the optimizer state is not sync"
self._layer.to(device=device, dtype=dtype, blocking=blocking)
# Re-build the buckets, hooks, etc..
self._fresh_trainable()
def _fresh_trainable(self):
""" Whether to update training parameters. """
# Make sure that this is not done while gradients are waiting to be reduced (if no_sync context for instance)
if reduce(lambda x, y: x or y, self._grad_reduced, False):
logging.warning("Grads waiting to be reduced.")
self._trainable_params = list(
filter(lambda x: x.trainable, self._all_params))
self._trainable_params.sort(key=lambda x: x._numel())
self._trainable_param2rank = {}
for optim in self._sharding_optimizers:
# Need to be wrappered for Sharding Stage2 Optimizer
if len(optim.param_storages.keys()) == 0:
optim._update_opt_status()
# Get the parameters split by the optimizer according to rank
for per_rank_params in optim.dtype_rank_params.values(
): # all the params from all ranks
for params in per_rank_params:
for param in filter(lambda x: x.trainable, params):
self._trainable_param2rank[
param.name] = optim.param2rank[param.name]
self._trainable_param2align[
param.name] = optim._param2align[param.name]
# Create grad_storage
self._setup_use_grad_storage()
# setup backward hooks
self._setup_backward_hooks()
@paddle.autograd.no_grad()
def __sync_buffers(self):
"""
Sync all the param buffers from all ranks (exp: batch norm statistics).
"""
for buffer in self._layer.buffers(include_sublayers=True):
collective.broadcast(
buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
try:
return super().__getattr__(name)
except AttributeError:
return getattr(self._layer, name)
@paddle.autograd.no_grad()
def _clear_counters(self):
"""Reset all the grad reduce and call counters."""
if self.training:
self._grad_reduced = [True for _ in self._trainable_params]
if self._use_grad_storage:
for grad_storage in self._grad_storage_list:
grad_storage.reset_checked_in()
def _get_reduce_fn(self, index, param, dst_rank):
"""
There are two ways to reduce gradient.
- 1. Do not use self._use_grad_storage or exceeded buffer_max_size will be reduced separately.
- 2. Use grad_storage Reduce the storage to get the full gradient from different ranks.
"""
if not self._use_grad_storage or not self._has_grad_storage[index]:
# Direct reduction
@paddle.autograd.no_grad()
def reduce(*_):
# Skip gradient reduction, do not change status information
if self._grad_reduced[index]:
assert param.grad is not None, "Parameter gradient cannot be None"
# Change reduce information
self._grad_reduced[index] = False
# Clear the gradient that does not belong to the current rank through the callback function
def cleanup():
if dst_rank != self._rank:
param.clear_gradient(False)
elif self._offload:
tmp_grad = param.grad.cast(
dtype=Type.fp32.value).cpu()
self._sharding_optimizers[0]._offload_acc_grad(
param.name, tmp_grad)
del tmp_grad
param.clear_gradient(False)
# Synchronize the reduce parameter gradient
collective.reduce(
tensor=param.grad,
dst=self._group.ranks[dst_rank],
group=self._group)
# TODO (Baibaifan) Asynchronous the reduce parameter gradient
# Clear the task flow and trigger callback to clear the redundant gradient
# self._clear_task_flow()
cleanup()
else:
# Buffer reduction
@paddle.autograd.no_grad()
def reduce(*_):
# Skip gradient reduction, do not change status information
if self._grad_reduced[index]:
assert param.grad is not None, "Parameter gradient cannot be None"
# Change reduce information
self._grad_reduced[index] = False
grad_storage = self._grad_storages[param.dtype][dst_rank]
grad_storage.params_checked_in += 1
if grad_storage.all_checked_in:
assert grad_storage.buffer is not None
# Clearing up the grad_storage buffer
def cleanup():
if dst_rank != self._rank:
for p in grad_storage._params:
p.clear_gradient(False)
grad_storage.buffer._clear_data()
elif self._offload:
grad_storage.to(device=self._offload_device)
for p in grad_storage._params:
with device_guard():
tmp_grad = p.grad.cast(
dtype=Type.fp32.value)
self._sharding_optimizers[
0]._offload_acc_grad(p.name, tmp_grad)
p.clear_gradient(False)
grad_storage._device = self._default_device
grad_storage.buffer._clear_data()
# Reduce the bucket
grad_storage.sent = True
# Synchronize the reduce parameter gradient
collective.reduce(
tensor=grad_storage.buffer,
dst=self._group.ranks[grad_storage.destination],
group=self._group)
# TODO (Baibaifan) Asynchronous the reduce parameter gradient
cleanup()
# Clear the task flow and trigger callback to clear the redundant gradient
# self._clear_task_flow()
return reduce
def _setup_backward_hooks(self):
"""
Set the backward hook to synchronize the gradients of all rank by reduce group ranks.
"""
# Remove previous backward hooks
while len(self._bw_hooks) > 0:
self._bw_hooks.pop().remove()
# Go through the parameters, attach the hook
if not self.training:
return
for index, param in enumerate(self._trainable_params):
dst_rank = self._trainable_param2rank[param.name]
reduce_function = self._get_reduce_fn(index, param, dst_rank)
self._bw_hooks.append(
param._register_backward_hook(reduce_function))
def _setup_use_grad_storage(self):
"""
Integrate the parameters gradient into a continuous memory according to rank, and support the update of training parameters.
"""
# According to parameters's numel sort, allocate memory of parameter gradient to continuous memory according to rank
self._grad_storages = {}
self._has_grad_storage = [False for _ in self._trainable_params]
for index, param in enumerate(self._trainable_params):
dst_rank = self._trainable_param2rank[param.name]
if param.dtype not in self._grad_storages.keys():
self._grad_storages[param.dtype] = {}
if dst_rank not in self._grad_storages[param.dtype].keys():
self._grad_storages[param.dtype][dst_rank] = GradStorage(
self._buffer_max_size[param.dtype],
dtype=param.dtype,
device=self._default_device,
destination=dst_rank,
parm2align=self._trainable_param2align)
# Criteria to decide whether this parameter is to be put in GradStorage
if self._grad_storages[param.dtype][dst_rank].can_add_grad_view(
param, self._trainable_param2align[param.name]):
self._grad_storages[param.dtype][dst_rank].add_grad(
param, self._trainable_param2align[param.name])
self._has_grad_storage[index] = True
else:
self._param_grads.append(param.name)
print(
"Can not add param: {}, param's shape: {}, param align: {}, grad_storages fill: {}, ".
format(param.name, param.shape, self._trainable_param2align[
param.name], self._grad_storages[param.dtype][dst_rank]
._fill))
for dtype in self._grad_storages.keys():
self._grad_storage_list.extend(
list(self._grad_storages[dtype].values()))
# def _clear_task_flow(self):
# """Try to consume the previous tasks."""
# while len(self._tasks_flow) > 0:
# task = self._tasks_flow.popleft()
# task.wait()
# if task.callback is not None:
# task.callback()
def _detect_train_change(self):
# Current trainable parameters
trainable_mask = list(map(_trainable, self._all_params))
# Whether parameters trainability changed
trainability_changed = trainable_mask != self._trainable_mask
if trainability_changed:
logging.warning(
"Trainable params changed, because of eval/train mode or parameter freezing/unfreeze."
)
self._trainable_mask = trainable_mask
return trainability_changed
def _build_grad_storages(self):
"""
Rebuild grad storages.
"""
# Rebuild fp16/fp32 grad storages
for dtype in self._grad_storages.keys():
for dst_rank, grad_storage in self._grad_storages[dtype].items():
if self._offload or dst_rank != self._rank:
grad_storage.manumal_relase()
grad_storage.rebuild()
def _rank_buffer_size(self, buffer_max_size, model_size):
"""
Generate the minimum buffer size for each rank & Display param sizes and model sizes.
"""
# Initialize buffer size
rank_buffer_size = {}
for shard_opt in self._sharding_optimizers:
if shard_opt.rank_buffer_size:
for dtype in shard_opt.rank_buffer_size.keys():
sizes = max(shard_opt.rank_buffer_size[dtype].values())
rank_buffer_size[dtype] = min(sizes, buffer_max_size)
if Type.fp16.value in rank_buffer_size.keys():
# FP16 GradStorage and model size
logger_.info(
"====== FP16 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".
format(rank_buffer_size[Type.fp16.value] / 2**19, model_size / 2
**19))
if Type.fp32.value in rank_buffer_size.keys():
# FP32 GradStorage and model size
logger_.info(
"====== FP32 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".
format(rank_buffer_size[Type.fp32.value] / 2**18, model_size / 2
**18))
return rank_buffer_size
def _redefine_opt_step(self):
grad_func = self._grad_scale
for opt in self._sharding_optimizers:
opt_step = opt.step
def _opt_step(self):
grad_func()
opt_step()
opt.step = MethodType(_opt_step, opt)
def _redefine_opt_clear(self):
clear_func = self._clear_gradients
def _opt_clear(self):
clear_func()
for opt in self._sharding_optimizers:
opt.clear_grad = MethodType(_opt_clear, opt)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import numpy as np
from types import MethodType
from collections import OrderedDict
import paddle
from paddle import nn
from paddle.autograd import EagerPyLayer
import paddle.fluid.core as core
import paddle.fluid.framework as framework
from paddle.fluid.framework import EagerParamBase
from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.distributed import collective
from .group_sharded_storage import GradStorage
from .group_sharded_utils import Type, GroupShardedClipGrad, device_guard
def _all_gather(tensor, buffer_size, group):
"""
The main difference with paddle.distributed.all_gather:
no need to pass in tensor_list, the returned tensor is spliced
"""
assert group is not None
if framework.in_dygraph_mode():
out = paddle.zeros([buffer_size], dtype=tensor.dtype)
task = group.process_group.all_gather(tensor, out)
return out, task
# CUDA alignment 256 bytes
alignment = {"gpu": 256, }
align = {
Type.fp16.value: 2,
Type.fp32.value: 4,
}
global CHECK_LAYER
CHECK_LAYER = dict() # Help to check layer's id -> layer's name
class GroupShardedStage3(nn.Layer):
"""
A wrapper for Sharding Stage3 Layer in Dygraph.
.. warning: GroupShardedStage3 encapsulates the layer strategy and integrates it into the nn.Layer.
.. ZeRO: https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes::
# 1. The model supports the segmentation of parameters by global ranks in layers.
# 2. Support communication flow and computing flow.
# 3. Support offload function.
# 4. Support the establishment of independent communication groups.
def __init__(self,
layer,
optimizer,
group=None,
sync_buffers=False,
device="gpu",
segment_size=2**20,
pertrain_sync_models=True,
offload=False,
sync_comm=False):
super().__init__()
# Default configs
assert core.is_compiled_with_cuda(), "Only support CUDA."
self._layer = layer
self._default_device = device
self.__sync_buffers = sync_buffers
self._offload = offload
self._sync_comm = sync_comm
# segmentation size
assert segment_size >= 0, "segment_size must be GE than 0."
self._segment_size = segment_size
global DEV
DEV = "cpu" if paddle.get_device() == "cpu" else paddle.get_device(
).split(":")[0]
global DEV_ID
DEV_ID = 0 if paddle.get_device() == "cpu" else int(paddle.get_device()
.split(":")[1])
global param2dtype
param2dtype = dict()
# Communication group establishment
self._group = collective.new_group(collective._get_global_group()
.ranks) if group is None else group
self._world_size_scaling = 1.0 / self._group.nranks
assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1."
self._rank = self._group.rank
self._global_root_rank = self._group.ranks[
0] # picking ranks index 0 as the reference
# Parameter segmentation for global ranks
# After flatten -> self._param2buffer_size, self._param2buffer, self._trainable_params
self._param2buffer_size = dict() # {param.name: size}
self._param2buffer = dict(
) # {param.name: [(start0, end0),(start1, end1), ...]}
self._trainable_params = dict() # {id(layer): [trainable_params]}
self._unslice_params = set() # param's numel <= segment_size
self._unslice_params2align = dict() # {param.name: param's align}
self._grad_storages = dict() # {param.dtype: GradStorage}
assert not isinstance(
optimizer, list), "Multiple optimizers are not supported now."
self._optim = _OptimizerWrapper(optimizer, self._offload, self._group,
self._update_params_slice)
self._ori_parameter_list = self._optim._parameter_list
self._ori_param_groups = self._optim._param_groups
# Replace optimizer's _grad_clip
if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm):
logging.warning(
"While using ClipGradByGlobalNorm in GroupShardedStage3, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = GroupShardedClipGrad(
self._optim._grad_clip, paddle.get_device(), self._group)
if self._optim._parameter_list and isinstance(
self._optim._parameter_list[0], dict):
for item in self._optim._param_groups:
if "grad_clip" in item.keys():
item["grad_clip"] = self._optim._grad_clip
# Synchronous all ranks models
if pertrain_sync_models:
self._sync_params_and_buffers()
self._segment_rank_params(self._layer)
# Add unslice params to master_weight in fp16
self._handle_unslice_params()
# In the first step, record the execution order of the layer
self._order_tracer = OrderedDict()
self._order_tracer["order"] = 0
self._order_tracer["layer"] = list()
# Register task flow
self._task_flow = TaskFlow()
# Register forward hooks
self._register_forward_hooks(self._layer)
# Register backward parameter hooks
self._register_backward_hooks()
# Redefine optimizer step and clear function
self._redefine_opt_step()
self._redefine_opt_clear()
@paddle.autograd.no_grad()
def _sync_params_and_buffers(self):
"""
Sync all model states for all ranks
"""
for p in self._layer.parameters():
collective.broadcast(
p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
def _clear_gradients(self):
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
# 1.Handle param's slice
trainable_params = list(
filter(lambda p: p.trainable and p not in self._unslice_params,
current_layer_params))
for param in trainable_params:
assert hasattr(
param, "fw_storage"
), "Find {} don't have fw_storage attribute.".format(param.name)
param.fw_storage.clear_gradient(False)
param.bw_storage._clear()
param.bw_storage = None
# 2.Handle unslice param
if not self._offload:
for grad_storage in self._grad_storages.values():
grad_storage.buffer.zero_()
else:
for param in list(self._unslice_params):
param.clear_gradient(False)
tmp_var = param.cuda(DEV_ID)
param._clear_data()
if tmp_var.dtype == Type.fp32.value and param2dtype[
param.name] == Type.fp16.value:
tmp_var = paddle.cast(tmp_var, Type.fp16.value)
tmp_var._share_buffer_to(param)
del tmp_var
for grad_storage in self._grad_storages.values():
grad_storage.manumal_relase()
grad_storage.rebuild()
# Update param memery slice
def _update_params_slice(self):
update_list = self._update_params()
if not isinstance(self._optim._param_groups[0], dict):
slice_params = [param.fw_storage for param in update_list]
self._optim._parameter_list = slice_params + list(
self._unslice_params)
self._optim._param_groups = slice_params + list(
self._unslice_params)
else:
for param_group in self._optim._param_groups:
p_group = []
for p in param_group['params']:
if hasattr(p, "fw_storage"):
p_group.append(p.fw_storage)
else:
p_group.append(p)
param_group['params'] = p_group
def forward(self, *inputs, **kwargs):
"""
A wrapper for Sharding Stage3 layer.
"""
# 1.Sync layer's buffers state
if self.__sync_buffers:
self._sync_buffers()
# 2.Normal FW on the base model
fw = self._layer(*inputs, **kwargs)
return fw
def set_state_dict(self, state_dict, use_structured_name=True):
self._layer.set_state_dict(
state_dict, use_structured_name=use_structured_name)
def state_dict(self,
destination=None,
include_sublayers=True,
structured_name_prefix=""):
return self._layer.state_dict(
destination=destination,
include_sublayers=include_sublayers,
structured_name_prefix=structured_name_prefix)
def _handle_unslice_params(self):
buffer_size = dict()
buffer_size[Type.fp32.value] = 0
buffer_size[Type.fp16.value] = 0
for param in self._unslice_params:
# Updata optimizer master weights
if param.dtype == Type.fp16.value and not self._offload:
master_tensor = paddle.cast(param, Type.fp32.value)
master_tensor.name = param.name
self._optim._master_weights[param.name] = master_tensor
param2dtype[param.name] = param.dtype
p_align = self._param2align(param)
self._unslice_params2align[param.name] = p_align
buffer_size[param.dtype] += param._numel() + p_align
# Create unslice_params'grad
for param in sorted(list(self._unslice_params), key=lambda p: p.name):
if param.dtype not in self._grad_storages.keys():
self._grad_storages[param.dtype] = GradStorage(
buffer_size[param.dtype],
dtype=param.dtype,
device=self._default_device,
destination=self._rank,
parm2align=self._unslice_params2align)
self._grad_storages[param.dtype].add_grad(
param, self._unslice_params2align[param.name])
def _segment_rank_params(self, layer, name="last_layer"):
"""
Flatten parameters according to layer.
"""
current_layer_params = _current_layer_params(layer)
if current_layer_params:
CHECK_LAYER[id(layer)] = name
self._flatten_layer_params(layer, current_layer_params)
for name, sub_layer in layer.named_children():
self._segment_rank_params(sub_layer, name)
def _flatten_layer_params(self, layer, current_layer_params):
"""
Parameter segmentation and memory integration.
"""
def _add_manage_info(trainable_param):
return _PartitionParam(trainable_param)
current_params = list()
for p in current_layer_params:
if p.trainable and p._numel() > self._segment_size:
current_params.append(_add_manage_info(p))
elif p.trainable:
self._unslice_params.add(_UnsliceParam(p))
assert id(layer) not in self._trainable_params.keys()
self._trainable_params[id(layer)] = current_params
for param in self._trainable_params[id(layer)]:
if param.name in self._param2buffer.keys():
continue
self._param2buffer[param.name] = []
# 1.Params alignment
align_ = self._param2align(param)
offset = align_ + param._numel()
buffer_size = offset if offset % self._group.nranks == 0 else offset + self._group.nranks - (
offset % self._group.nranks)
self._param2buffer_size[param.name] = buffer_size
# 2.Combination param buffer
assert buffer_size % self._group.nranks == 0
pre_buffer = buffer_size // self._group.nranks
for rank_ in range(self._group.nranks):
self._param2buffer[param.name].append(
(rank_ * pre_buffer, (rank_ + 1) * pre_buffer))
# Record param's dtype
param2dtype[param.name] = param.dtype
# 3.Flatten layer params and release other rank buffer
self._param_storage(param, buffer_size)
def _param_storage(self, param, buffer_size):
"""
This is a function to simplify the handling of parameter InternalStorages.
"""
assert isinstance(buffer_size, int)
value = np.zeros(
buffer_size,
dtype=np.float16) if Type.fp16.value == param.dtype else np.zeros(
buffer_size, dtype=np.float32)
buffer = core.eager.Tensor(value=value, place=core.CPUPlace())
param_shape = param.shape
origin_state = param.stop_gradient
param.stop_gradient = True
param.flatten_()
param.stop_gradient = origin_state
start, end = self._param2buffer[param.name][self._rank]
# Copy the current param value
with device_guard():
tmp_var = buffer._slice(0, param._numel())
param_cpu = param.cpu()
tmp_var.get_tensor().set(param_cpu.get_tensor(), core.CPUPlace())
del tmp_var
param.get_tensor()._set_dims(param_shape)
param._clear_data()
# Current rank param_storage
if self._offload:
with device_guard():
tmp_tensor = buffer._slice(start, end)
param.fw_storage = core.eager.Tensor(
value=tmp_tensor,
place=core.CPUPlace(),
name="slice@" + param.name)
else:
param.fw_storage = core.eager.Tensor(
value=buffer._slice(start, end), name="slice@" + param.name)
param.status = "part"
# Updata optimizer master weights
if param.dtype == Type.fp16.value and not self._offload:
master_tensor = paddle.cast(param.fw_storage, Type.fp32.value)
master_tensor.name = param.name
self._optim._master_weights[param.fw_storage.name] = master_tensor
def _register_forward_hooks(self, layer):
"""
Register EagerPyLayer to manage memory slices.
There are four stages:
FW
1. Before the forward layers, synchronize the full parameters.
2. After the forward layers, release the full parameter and keep the parameter slice.
BW
3. Before the backward layers, synchronize the full parameters and create param's grad.
4. After the gradient accumulation, release the full parameter and keep the parameter slice.
"""
current_layer_params = _current_layer_params(layer)
if current_layer_params:
self._register_forward_all_hooks(layer, self._task_flow)
for _, sub_layer in layer.named_children():
self._register_forward_hooks(sub_layer)
def _register_forward_all_hooks(self, sub_layer, task_flow):
def _forward_pre_hook(layer, inputs):
return ForwardPreHooks(layer, self._order_tracer,
self._trainable_params,
self._param2buffer_size, self._group,
self._sync_comm, self._offload, task_flow)
def _forward_post_hook(layer, inputs, outputs):
return ForwardPostHooks.apply(
outputs, layer, self._order_tracer, self._trainable_params,
self._param2buffer, self._param2buffer_size, self._rank,
self._group, self._sync_comm, self._offload, task_flow)
# register previous forward hooks
sub_layer.register_forward_pre_hook(_forward_pre_hook)
# register post forward hooks
sub_layer.register_forward_post_hook(_forward_post_hook)
@paddle.autograd.no_grad()
def _sync_buffers(self):
"""
Sync all the param buffers from all ranks (exp: batch norm statistics).
"""
for buffer in self._layer.buffers(include_sublayers=True):
collective.broadcast(
buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
try:
return super().__getattr__(name)
except AttributeError:
return getattr(self._layer, name)
def _update_params(self):
"""
Update parameters to optimizer memory slice.
"""
update_list = []
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(lambda p: p.trainable and p not in self._unslice_params,
current_layer_params))
# 1.Handle param's slice
for param in trainable_params:
assert hasattr(
param,
"fw_storage"), "Find {} don't have fw_storage attribute".format(
param.name)
# Gradient average
if self._offload:
with device_guard():
param.bw_storage.scale_(scale=self._world_size_scaling)
else:
param.bw_storage.scale_(scale=self._world_size_scaling)
param.fw_storage = _VarBaseWrapper(param)
assert param.fw_storage.grad is None
param.fw_storage._copy_gradient_from(param.bw_storage)
update_list.append(param)
# 2.Handle unslice param
for grad_storage in self._grad_storages.values():
grad_storage.buffer.scale_(scale=self._world_size_scaling)
collective.all_reduce(tensor=grad_storage.buffer, group=self._group)
if self._offload:
for param in list(self._unslice_params):
tmp_var = _device2cpu(param, convert_dtype=True)
tmp_var._share_buffer_to(param)
del tmp_var
for grad_storage in self._grad_storages.values():
for p in grad_storage._params:
tmp_g = _device2cpu(p.grad, convert_dtype=True)
p.clear_gradient(False)
p._copy_gradient_from(tmp_g)
del tmp_g
grad_storage.buffer._clear()
return update_list
def get_all_parameters(self, convert2cpu=False):
"""
Get the full parameters and return the corresponding task flows.
"""
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(lambda p: p.trainable and p not in self._unslice_params,
current_layer_params))
t_flow = _allgather_buffer(
trainable_params,
self._group,
param2buffer_size=self._param2buffer_size,
use_calc_stream=True,
task_flow=TaskFlow(),
sync_wait=True,
offload=self._offload,
convert2cpu=convert2cpu)
if convert2cpu:
for param in trainable_params:
t_flow.full_param[param.name][0]._share_buffer_to(param)
self._optim._parameter_list = self._ori_parameter_list
self._optim._param_groups = self._ori_param_groups
def _register_backward_hooks(self):
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(lambda p: p.trainable and p not in self._unslice_params,
current_layer_params))
for param in trainable_params:
allreduce_function = self._get_allreduce_fn(param)
param._register_backward_hook(allreduce_function)
def _get_allreduce_fn(self, param):
@paddle.autograd.no_grad()
def allreduce_(*_):
if param.name in self._task_flow.full_grad.keys():
full_grad = self._task_flow.full_grad[param.name]
# Only support sync allreduce current rank's layer now
collective.all_reduce(tensor=full_grad, group=self._group)
start, end = self._param2buffer[param.name][self._rank]
if param.bw_storage is None:
param.bw_storage = full_grad._slice(start,
end).detach().clone()
if self._offload:
param.bw_storage = _device2cpu(param.bw_storage, True)
else:
if self._offload:
cpu_grad = _device2cpu(
full_grad._slice(start, end).detach().clone(), True)
with device_guard():
param.bw_storage = paddle.add(param.bw_storage,
cpu_grad)
else:
param.bw_storage = paddle.add(
param.bw_storage,
full_grad._slice(start, end).detach().clone())
param.clear_gradient(False)
del self._task_flow.full_grad[param.name]
if param.name in self._task_flow.full_param.keys():
if param.status == "all":
param.use_count = 0
param._clear_data()
start, end = self._param2buffer[param.name][self._rank]
param.fw_storage = self._task_flow.full_param[param.name][
0]._slice(start, end).detach().clone()
param.status = "part"
del self._task_flow.full_param[param.name]
if self._offload:
param.fw_storage = _device2cpu(param.fw_storage, True)
return allreduce_
def _param2align(self, param):
# CUDA alignment 256 bytes
size = param._numel() * align[param.dtype]
remaining = size % alignment[self._default_device]
ali = 0 if remaining == 0 else alignment[
self._default_device] - remaining
align_ = ali // align[param.dtype]
return align_
def _redefine_opt_step(self):
params_slice_func = self._update_params_slice
opt_step = self._optim.step
def _opt_step(self):
if not self.update_scaler:
params_slice_func()
if self.offload:
with device_guard():
opt_step()
else:
opt_step()
def _opt_minimize(self):
raise RuntimeError(
"optimizer.minimize() not support now, please use optimizer.step()"
)
self._optim.step = MethodType(_opt_step, self._optim)
self._optim.minimize = MethodType(_opt_minimize, self._optim)
def _redefine_opt_clear(self):
clear_func = self._clear_gradients
def _opt_clear(self):
clear_func()
self._optim.clear_grad = MethodType(_opt_clear, self._optim)
def ForwardPreHooks(layer, order_tracer, trainable_params, param2buffer_size,
group, sync_comm, offload, task_flow):
# Record layer's id
layer_id = id(layer)
use_calc, sync_wait = False, False
if layer_id not in order_tracer.keys() or sync_comm:
use_calc, sync_wait = True, True
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
else:
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
# wait current layer params
_wait_layer(trainable_params[layer_id], task_flow, group,
param2buffer_size, use_calc, offload)
if layer_id == order_tracer["layer"][-1]: return
order_ = order_tracer[layer_id]
layer_id = order_tracer["layer"][order_ + 1]
_allgather_buffer(
trainable_params[layer_id],
group,
param2buffer_size=param2buffer_size,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload)
return
class ForwardPostHooks(EagerPyLayer):
@staticmethod
def forward(ctx, inputs, layer, order_tracer, trainable_params,
param2buffer, param2buffer_size, rank, group, sync_comm,
offload, task_flow):
layer_id = id(layer)
# release current layer full params
_release_param(trainable_params[layer_id], param2buffer, rank,
task_flow, offload)
if layer_id not in order_tracer.keys():
order_ = order_tracer["order"]
order_tracer[layer_id] = order_
order_tracer["order"] += 1
order_tracer["layer"].append(layer_id)
#Record fw info
ctx.order_tracer = order_tracer
ctx.task_flow = task_flow
ctx.group = group
ctx.layer_id = layer_id
ctx.sync_comm = sync_comm
ctx.trainable_params = trainable_params
ctx.param2buffer_size = param2buffer_size
ctx.offload = offload
return inputs
@staticmethod
def backward(ctx, *args):
# Load context value
order_tracer = ctx.order_tracer
task_flow = ctx.task_flow
group = ctx.group
layer_id = ctx.layer_id
trainable_params = ctx.trainable_params
param2buffer_size = ctx.param2buffer_size
sync_comm = ctx.sync_comm
offload = ctx.offload
use_calc, sync_wait = False, False
# Allgather params synchronization
if sync_comm:
use_calc, sync_wait = True, True
_allgather_buffer(
trainable_params[layer_id],
group,
param2buffer_size=param2buffer_size,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload)
else:
_wait_layer(trainable_params[layer_id], task_flow, group,
param2buffer_size, use_calc, offload)
# Create params's grad
_create_params_grad(trainable_params[layer_id], param2buffer_size,
task_flow)
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
if layer_id != order_tracer["layer"][0] and not sync_comm:
layer_next_id = order_tracer["layer"][order_tracer[layer_id] - 1]
_allgather_buffer(
trainable_params[layer_next_id],
group,
param2buffer_size=param2buffer_size,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload)
return args
class TaskFlow:
"""
Task flows, one way linked list for task acquisition.
"""
def __init__(self,
full_param=dict(),
full_grad=dict(),
use_calc=dict(),
callback=None):
self.full_param = full_param
self.full_grad = full_grad
self.use_calc = use_calc
self.callback = callback
def _release_param(trainable_params,
param2buffer,
rank,
task_flow,
offload=False):
for param in trainable_params:
# async communicate share weight not clear
param.use_count -= 1
if param.use_count == 0:
param._clear_data()
if param.name in task_flow.full_param.keys():
start, end = param2buffer[param.name][rank]
with paddle.amp.auto_cast(enable=False):
param.fw_storage = task_flow.full_param[param.name][
0]._slice(start, end).detach().clone()
param.status = "part"
del task_flow.full_param[param.name]
if offload:
param.fw_storage = _device2cpu(param.fw_storage)
return
def _wait_layer(trainable_params,
task_flow,
group,
param2buffer_size,
use_calc_stream,
offload=False):
for param in trainable_params:
if param.status == "all":
param.use_count += 1
continue
if param.name in task_flow.full_param.keys():
full_param, task = task_flow.full_param[param.name]
task.wait()
full_param._slice(0, param._numel())._share_buffer_to(param)
param.fw_storage._clear()
param.fw_storage = None
param.status = "all"
param.use_count += 1
else:
_allgather_buffer(
trainable_params,
group,
param2buffer_size=param2buffer_size,
use_calc_stream=True,
task_flow=task_flow,
sync_wait=True,
offload=offload)
break
return task_flow
def _allgather_buffer(trainable_params,
group,
param2buffer_size,
use_calc_stream,
task_flow,
sync_wait=False,
offload=False,
convert2cpu=False):
for param in trainable_params:
if param.status == "all":
param.use_count += 1
continue
if offload:
param.fw_storage = _cpu2device(param)
buffer_size = param2buffer_size[param.name]
with paddle.amp.auto_cast(enable=False):
full_param, task = _all_gather(param.fw_storage, buffer_size, group)
# Allgather current layer in the 1st step synchronously
if sync_wait:
with paddle.amp.auto_cast(enable=False):
task.wait()
full_param._slice(0, param._numel())._share_buffer_to(param)
param.fw_storage._clear()
param.fw_storage = None
param.status = "all"
param.use_count += 1
task_flow.full_param[param.name] = (full_param, task)
# parameter converts to cpu
if convert2cpu:
p_name = param.name
param = _device2cpu(param)
del task_flow.full_param[p_name]
task_flow.full_param[p_name] = (param, None)
return task_flow
@paddle.autograd.no_grad()
def _create_params_grad(trainable_params, param2buffer_size, task_flow):
for param in trainable_params:
if param.name in task_flow.full_grad.keys():
continue
assert isinstance(param2buffer_size[param.name], int)
temp_grad = paddle.zeros(
[param2buffer_size[param.name]], dtype=param.dtype)
temp_tensor = temp_grad._slice(0, param._numel())
temp_tensor.get_tensor()._set_dims(param.shape)
param._copy_gradient_from(temp_tensor)
del temp_tensor
task_flow.full_grad[param.name] = temp_grad
return task_flow
def _PartitionParam(param):
if not hasattr(param, "fw_storage"):
setattr(param, "fw_storage", None)
setattr(param, "bw_storage", None)
setattr(param, "status", "all")
setattr(param, "use_count", 0)
return param
def _UnsliceParam(param):
if not hasattr(param, "unslice"):
setattr(param, "unslice", True)
return param
def _VarBaseWrapper(param):
varbase = param.fw_storage
tmp_param = EagerParamBase(
shape=varbase.shape, dtype=varbase.dtype, name="slice@" + param.name)
varbase._share_buffer_to(tmp_param)
tmp_param.regularizer = param.regularizer
tmp_param.optimize_attr['learning_rate'] = param.optimize_attr[
'learning_rate']
varbase._clear()
return tmp_param
def _OptimizerWrapper(optimizer, offload, group, update_params_slice):
if not hasattr(optimizer, "_optim"):
setattr(optimizer, "_optim", optimizer)
setattr(optimizer, "offload", offload)
setattr(optimizer, "_group", group)
setattr(optimizer, "update_scaler", None)
setattr(optimizer, "update_slice", update_params_slice)
return optimizer
def _device2cpu(trans_param, convert_dtype=False):
if convert_dtype:
trans_param = paddle.cast(trans_param, Type.fp32.value)
tmp_p = trans_param.cpu()
trans_param._clear_data()
return tmp_p
def _cpu2device(param):
tmp_p = param.fw_storage.cuda(DEV_ID)
if tmp_p.dtype == Type.fp32.value and param2dtype[
param.name] == Type.fp16.value:
tmp_p = paddle.cast(tmp_p, Type.fp16.value)
return tmp_p
def _current_layer_params(layer):
return layer.parameters(
include_sublayers=False) + list(layer.extra_parameters) if hasattr(
layer, "extra_parameters") else layer.parameters(
include_sublayers=False)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#Taken and modified for fairscale from:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py
#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
import os
import time
import numpy as np
import paddle
from paddle.fluid import core
from .group_sharded_utils import Type, device_guard
class InternalStorage:
"""
This is a basic class, which is responsible for consolidating the basic storage tensor.
"""
# Support integration parameter tensor
def __init__(self, size, dtype, device, convert_cpu=False):
self._params = []
self._param_ids = []
self._fill = 0
self._device = device
self._dtype = dtype
# The flatten tensor
size = [size] if isinstance(size, int) else size
if convert_cpu:
value = np.zeros(
size,
dtype=np.float16) if Type.fp16.value == dtype else np.zeros(
size, dtype=np.float32)
self.buffer = core.eager.Tensor(value=value, place=core.CPUPlace())
else:
self.buffer = paddle.zeros(size, dtype=dtype)
self.dev_id = 0 if paddle.get_device() == "cpu" else int(
paddle.get_device().split(":")[1])
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
assert self.buffer is not None, "Cannot move a collapsed bucket, please rebuild it"
assert (dtype == Type.fp32.value or
Type.fp16.value), "Conversion type is not supported now"
if self._device != device:
tmp_buffer = self.buffer.cuda(
self.dev_id) if device == "gpu" else self.buffer.cpu()
for param in self._params:
param.clear_gradient(False)
del self.buffer
self.buffer = tmp_buffer
self._device = device
if dtype is not None:
self.buffer = self.buffer.cast(dtype=dtype)
self._dtype = dtype
class ParamStorage(InternalStorage):
"""
This is a basic class to simplify the handling of parameter InternalStorages.
"""
def __init__(self, size, dtype, device):
super().__init__(size, dtype, device, convert_cpu=True)
self.param2align = None
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
super().to(device, dtype)
if keep_alignment:
self._array_params()
@paddle.autograd.no_grad()
def add_rank_params(self, trainable_params, param2align, convert_gpu=True):
"""
Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer.
"""
assert all([
id(param) not in self._param_ids for param in trainable_params
]), "The same param cannot be checked in twice"
assert self.buffer is not None
self.param2align = param2align
cpu_param_shape = list()
for param in trainable_params:
p_shape = self._add_param_as_view(param, param2align[param.name],
convert_gpu)
cpu_param_shape.append(p_shape)
if convert_gpu:
# buffer convert from cpu to cuda
self.buffer = self.buffer.cuda(self.dev_id)
self._fill = 0
for idx, param in enumerate(trainable_params):
self._convert_buffer(param, cpu_param_shape[idx],
param2align[param.name])
self._params.append(param)
self._param_ids.append(id(param))
@paddle.autograd.no_grad()
def _add_param_as_view(self, param, align, convert_gpu=True):
assert (
param.dtype == self.buffer.dtype
), "Different types for the InternalStorage and the param, cannot proceed: {} - {}".format(
param.dtype, self.buffer.dtype)
var_end = self._fill + param._numel()
offset = var_end + align
assert offset <= self.buffer._numel()
p_shape = param.shape
origin_state = param.stop_gradient
param.stop_gradient = True
param.flatten_()
param.stop_gradient = origin_state
# Copy the current param value
with device_guard(self.dev_id, "cpu"):
tmp_var = self.buffer._slice(self._fill, var_end)
if convert_gpu:
param_cpu = param.cpu()
param._clear_data()
tmp_var.set_value(param_cpu)
else:
tmp_var.set_value(param)
del tmp_var
self._fill = offset
return p_shape
@paddle.autograd.no_grad()
def _convert_buffer(self, param, p_shape, align):
var_end = self._fill + np.prod(p_shape).tolist()
offset = var_end + align
assert offset <= self.buffer._numel()
# Convert the param value
with device_guard(self.dev_id, self._device):
tmp_tensor = self.buffer._slice(self._fill, var_end)
tmp_tensor._share_buffer_to(param)
param.get_tensor()._set_dims(p_shape)
self._fill = offset
@paddle.autograd.no_grad()
def _array_params(self):
"""
Given the parameters which have been registered previously, rebuild the whole InternalStorage.
"""
assert len(self._params) > 0
assert self.param2align is not None
self._fill = 0
for p in self._params:
self._convert_buffer(p, p.shape, self.param2align[p.name]) # modify
class GradStorage(InternalStorage):
"""
This is a basic class to simplify the handling of gradient InternalStorages
"""
def __init__(self,
size,
dtype,
device,
destination,
parm2align,
convert_cpu=False):
if isinstance(size, np.int64):
size = size.tolist()
super().__init__(size, dtype, device, convert_cpu)
self._max_size = size
self._release = False
self.params_checked_in = 0
self.destination = destination
self._parm2align = parm2align
self.sent = False
def reset_checked_in(self):
""" Reset the counter of the parameter grads which have been checked in
"""
self.params_checked_in = 0
self.sent = False
@property
def all_checked_in(self):
""" Judge all the expected gradient check-in happened """
return len(self._params) == self.params_checked_in
def can_add_grad_view(self, param, align):
""" Is there enough InternalStorage to add this parameter gradient, and whether this param have already checked in.
"""
return self._fill + param._numel() + align <= self._max_size and id(
param) not in self._param_ids
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
if self._release:
self.rebuild()
super().to(device, dtype)
if keep_alignment:
self._array_grads()
@paddle.autograd.no_grad()
def add_grad(self, param, align):
"""
Add a new parameter gradient to the InternalStorage. Param.grad becomes a view of this InternalStorage buffer.
"""
assert id(
param
) not in self._param_ids, "The same gradients cannot be checked in twice"
self._add_grad_as_view(param, align)
self._params.append(param)
self._param_ids.append(id(param))
@paddle.autograd.no_grad()
def manumal_relase(self):
"""
Release the buffer from InternalStorage. The InternalStorage will need to be rebuilt before use.
"""
if not self._release:
for p in self._params:
if p.grad is not None:
p.clear_gradient(False)
self.buffer = None
self._fill = 0
self.params_checked_in = 0
self._release = True
@paddle.autograd.no_grad()
def rebuild(self):
"""
Given the parameter gradients which have been registered previously, rebuild the whole InternalStorage.
"""
if self._release:
self.buffer = paddle.zeros([self._max_size], dtype=self._dtype)
for p in self._params:
self._add_grad_as_view(p, self._parm2align[p.name])
self._release = False
@paddle.autograd.no_grad()
def _array_grads(self):
"""
Given the parameters gradients which have been registered previously, rebuild the whole InternalStorage.
"""
if len(self._params) > 0:
self._fill = 0
for p in self._params:
self._add_grad_as_view(p, self._parm2align[p.name])
@paddle.autograd.no_grad()
def _add_grad_as_view(self, param, align):
assert param._numel(
) > 0, "Cannot add a gradient to a released InternalStorage, please rebuild"
assert param.dtype == self.buffer.dtype
grad_end = self._fill + param._numel()
offset = grad_end + align
assert offset <= self.buffer._numel()
# Copy the current grad value to InternalStorage
with device_guard(self.dev_id, self._device):
tmp_var = self.buffer._slice(self._fill, grad_end)
tmp_var.get_tensor()._set_dims(param.shape)
param._copy_gradient_from(tmp_var)
del tmp_var
self._fill = offset
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import contextlib
from enum import Enum
import numpy as np
from types import MethodType
import paddle
from paddle import _C_ops
from paddle.fluid import core
from paddle.fluid import layers
from paddle.fluid.dygraph import to_variable
from paddle.fluid.framework import dygraph_only
class Taskflow:
"""
Task flows, one way linked list for task acquisition.
"""
def __init__(self, task, callback):
self.task = task
self.callback = callback
class Type(Enum):
"""
Type of trainable parameters
"""
fp16 = paddle.float16
fp32 = paddle.float32
class GroupShardedClipGrad:
def __init__(self, clip, device, group):
self._clip = clip
self._device = device
self._group = group
@paddle.autograd.no_grad()
def _dygraph_clip(self, params_grads):
sum_square_fp32, sum_square_fp16 = [], []
unslice_params_fp32, unslice_params_fp16 = [], []
for p, g in params_grads:
p_slice = True # using for slice parameter in sharding stage3
if g is None or getattr(p, 'need_clip', True) is False:
continue
if hasattr(p, "unslice"):
p_slice = False
merge_grad = g
if g.type == core.VarDesc.VarType.SELECTED_ROWS:
merge_grad = layers.get_tensor_from_selected_rows(
layers.merge_selected_rows(g))
square = layers.square(merge_grad)
sum_square = layers.reduce_sum(square)
if p.dtype == paddle.float16:
if p_slice: sum_square_fp16.append(sum_square)
else: unslice_params_fp16.append(sum_square)
elif p.dtype == paddle.float32:
if p_slice: sum_square_fp32.append(sum_square)
else: unslice_params_fp32.append(sum_square)
# global norm of non-distributed FP16 params_and_grads
if len(sum_square_fp16) == 0:
global_norm_fp16 = paddle.to_tensor([0.], dtype=paddle.float32)
else:
global_norm_fp16 = layers.concat(sum_square_fp16)
global_norm_fp16 = layers.reduce_sum(global_norm_fp16)
global_norm_fp16 = paddle.cast(
global_norm_fp16, dtype=paddle.float32)
# global norm of non-distributed FP16 params_and_grads for unslice parameters
if len(unslice_params_fp16) == 0:
global_unslice_fp16 = paddle.to_tensor([0.], dtype=paddle.float32)
else:
global_unslice_fp16 = layers.concat(unslice_params_fp16)
global_unslice_fp16 = layers.reduce_sum(global_unslice_fp16)
global_unslice_fp16 = paddle.cast(
global_unslice_fp16, dtype=paddle.float32)
# global norm of non-distributed FP32 params_and_grads
global_norm_fp32 = layers.concat(sum_square_fp32) if len(
sum_square_fp32) != 0 else paddle.to_tensor(
[0.], dtype=paddle.float32)
global_norm_fp32 = layers.reduce_sum(global_norm_fp32)
# global norm of non-distributed FP32 params_and_grads for unslice parameters
global_unslice_fp32 = layers.concat(unslice_params_fp32) if len(
unslice_params_fp32) != 0 else paddle.to_tensor(
[0.], dtype=paddle.float32)
global_unslice_fp32 = layers.reduce_sum(global_unslice_fp32)
global_unslice_var = global_unslice_fp16 + global_unslice_fp32
global_norm_var = global_norm_fp16 + global_norm_fp32 + 1.0 / self._group.nranks * global_unslice_var
# add all reduce to get global norm of distributed params_and_grads
dev_id = int(self._device.split(":")[1])
if paddle.device.get_device() == "cpu":
global_norm_var = global_norm_var.cuda(dev_id)
with device_guard(dev_id, "gpu"):
paddle.distributed.all_reduce(global_norm_var, group=self._group)
global_norm_var = layers.sqrt(global_norm_var)
max_global_norm = layers.fill_constant(
shape=[1], dtype=global_norm_var.dtype, value=self.clip_norm)
clip_var = layers.elementwise_div(
x=max_global_norm,
y=layers.elementwise_max(
x=global_norm_var, y=max_global_norm))
clip_var_fp16 = paddle.cast(clip_var, paddle.float16)
for p, g in params_grads:
if getattr(p, 'need_clip', True) is False or g is None:
continue
origin_state = g.stop_gradient
g.stop_gradient = True
if p.dtype == paddle.float16:
g.scale_(clip_var_fp16.item())
else:
g.scale_(clip_var.item())
g.stop_gradient = origin_state
# p._reset_grad_inplace_version(True)
return params_grads
def __getattr__(self, item):
return getattr(self._clip, item)
def __call__(self, params_grads):
return self._dygraph_clip(params_grads)
@contextlib.contextmanager
def device_guard(dev_id=0, device="cpu"):
origin_device = paddle.device.get_device()
if device == "cpu":
paddle.set_device(device)
elif device == "gpu":
paddle.set_device("gpu:{}".format(dev_id))
try:
yield
finally:
paddle.set_device(origin_device)
@dygraph_only
def GroupShardedScaler(scaler):
def unscale_method(self, optimizer):
if not self._enable:
return
param_grads = []
param_grads_fp16 = []
param_grads_fp32 = []
if hasattr(optimizer, "update_slice"):
optimizer.update_slice()
optimizer.update_scaler = True
if getattr(optimizer._optim, '_param_groups', None) and isinstance(
optimizer._optim._param_groups[0], dict):
for group in optimizer._optim._param_groups:
for param in group['params']:
if param.grad is not None:
param_grads.append(param.grad)
if param.grad.dtype in [
core.VarDesc.VarType.FP16, paddle.float16
]:
param_grads_fp16.append(param.grad)
else:
param_grads_fp32.append(param.grad)
else:
for param in optimizer._optim._parameter_list:
if param.grad is not None:
param_grads.append(param.grad)
if param.grad.dtype in [
core.VarDesc.VarType.FP16, paddle.float16
]:
param_grads_fp16.append(param.grad)
else:
param_grads_fp32.append(param.grad)
temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool))
temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool))
device = "cpu" if optimizer.offload else "gpu"
dev_id = 0 if device == "cpu" else int(paddle.get_device().split(":")[
1])
with device_guard(dev_id, device):
if len(param_grads_fp16):
_C_ops.check_finite_and_unscale(param_grads_fp16, self._scale,
param_grads_fp16,
temp_found_inf_fp16)
if len(param_grads_fp32):
_C_ops.check_finite_and_unscale(param_grads_fp32, self._scale,
param_grads_fp32,
temp_found_inf_fp32)
self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0
is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32")
paddle.distributed.all_reduce(
is_found_inf,
op=paddle.distributed.ReduceOp.MAX,
group=optimizer._group)
self._found_inf = is_found_inf.numpy()[0]
scaler._unscale = MethodType(unscale_method, scaler)
return scaler
...@@ -20,11 +20,20 @@ import paddle ...@@ -20,11 +20,20 @@ import paddle
from paddle.optimizer import Optimizer from paddle.optimizer import Optimizer
from paddle.distributed.utils import get_logger from paddle.distributed.utils import get_logger
from paddle.fluid.framework import in_dygraph_mode
# Old version
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler
# New version
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler
logger_ = get_logger(logging.INFO) logger_ = get_logger(logging.INFO)
...@@ -110,6 +119,19 @@ def group_sharded_parallel(model, ...@@ -110,6 +119,19 @@ def group_sharded_parallel(model,
logger_.info("*" * 30) logger_.info("*" * 30)
logger_.info("Sharded level os uses sharded level os_g achieved now.") logger_.info("Sharded level os uses sharded level os_g achieved now.")
logger_.info("*" * 30) logger_.info("*" * 30)
if in_dygraph_mode():
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list,
optim=optimizer,
group=group,
offload=offload)
model = GroupShardedStage2(
model,
optimizer,
group=group,
sync_buffers=sync_buffers,
buffer_max_size=buffer_max_size)
else:
optimizer = ShardingOptimizerStage2( optimizer = ShardingOptimizerStage2(
params=model.parameters(), params=model.parameters(),
optim=optimizer, optim=optimizer,
...@@ -122,6 +144,16 @@ def group_sharded_parallel(model, ...@@ -122,6 +144,16 @@ def group_sharded_parallel(model,
sync_buffers=sync_buffers, sync_buffers=sync_buffers,
buffer_max_size=buffer_max_size) buffer_max_size=buffer_max_size)
elif level == 'p_g_os': elif level == 'p_g_os':
if in_dygraph_mode():
model = GroupShardedStage3(
model,
optimizer=optimizer,
group=group,
sync_buffers=sync_buffers,
segment_size=segment_size,
offload=offload,
sync_comm=sync_comm)
else:
model = ShardingStage3( model = ShardingStage3(
model, model,
optimizer=optimizer, optimizer=optimizer,
...@@ -133,6 +165,9 @@ def group_sharded_parallel(model, ...@@ -133,6 +165,9 @@ def group_sharded_parallel(model,
else: else:
raise ValueError("Please enter the correct level.") raise ValueError("Please enter the correct level.")
if params_fp16 and isinstance(scaler, paddle.amp.GradScaler): if params_fp16 and isinstance(scaler, paddle.amp.GradScaler):
if in_dygraph_mode():
scaler = GroupShardedScaler(scaler)
else:
scaler = ShardingScaler(scaler) scaler = ShardingScaler(scaler)
logger_.info("*" * 30) logger_.info("*" * 30)
logger_.info( logger_.info(
...@@ -195,9 +230,9 @@ def save_group_sharded_model(model, output, optimizer=None): ...@@ -195,9 +230,9 @@ def save_group_sharded_model(model, output, optimizer=None):
), "Saving directory ({}) should be a directory, not a file".format(output) ), "Saving directory ({}) should be a directory, not a file".format(output)
os.makedirs(output, exist_ok=True) os.makedirs(output, exist_ok=True)
output_model = os.path.join(output, "model.pdmodel") output_model = os.path.join(output, "model.pdmodel")
if isinstance(model, ShardingStage2): if isinstance(model, (ShardingStage2, GroupShardedStage2)):
paddle.save(model._layer.state_dict(), output_model) paddle.save(model._layer.state_dict(), output_model)
elif isinstance(model, ShardingStage3): elif isinstance(model, (ShardingStage3, GroupShardedStage3)):
convert2cpu = True if model._offload else False convert2cpu = True if model._offload else False
model.get_all_parameters(convert2cpu=convert2cpu) model.get_all_parameters(convert2cpu=convert2cpu)
paddle.save(model._layer.state_dict(), output_model) paddle.save(model._layer.state_dict(), output_model)
......
...@@ -819,6 +819,10 @@ def monkey_patch_varbase(): ...@@ -819,6 +819,10 @@ def monkey_patch_varbase():
def _numel(self): def _numel(self):
return self.get_tensor()._numel() return self.get_tensor()._numel()
@framework.dygraph_only
def _clear_data(self):
self.get_tensor()._clear()
@framework.dygraph_only @framework.dygraph_only
def _uva(self, device_id=0): def _uva(self, device_id=0):
''' '''
...@@ -934,6 +938,7 @@ def monkey_patch_varbase(): ...@@ -934,6 +938,7 @@ def monkey_patch_varbase():
setattr(core.eager.Tensor, "_slice", _slice) setattr(core.eager.Tensor, "_slice", _slice)
setattr(core.eager.Tensor, "_numel", _numel) setattr(core.eager.Tensor, "_numel", _numel)
setattr(core.eager.Tensor, "_uva", _uva) setattr(core.eager.Tensor, "_uva", _uva)
setattr(core.eager.Tensor, "_clear_data", _clear_data)
else: else:
setattr(core.VarBase, "__name__", "Tensor") setattr(core.VarBase, "__name__", "Tensor")
setattr(core.VarBase, "grad", grad) setattr(core.VarBase, "grad", grad)
......
...@@ -1142,7 +1142,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) ...@@ -1142,7 +1142,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_dygraph_sharding_optimizer_stage2 PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_sharding_optimizer_stage2 PROPERTIES TIMEOUT 120)
set_tests_properties(test_dygraph_sharding_stage2 PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_sharding_stage2 PROPERTIES TIMEOUT 120)
set_tests_properties(test_dygraph_sharding_stage3 PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_sharding_stage3 PROPERTIES TIMEOUT 200)
set_tests_properties(test_dygraph_group_sharded_api PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_group_sharded_api PROPERTIES TIMEOUT 120)
set_tests_properties(test_auto_parallel_parallelizer PROPERTIES TIMEOUT 120) set_tests_properties(test_auto_parallel_parallelizer PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_mp_layers PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_mp_layers PROPERTIES TIMEOUT 120)
......
...@@ -22,6 +22,7 @@ import paddle.fluid as fluid ...@@ -22,6 +22,7 @@ import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph import nn from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model
epoch = 10 epoch = 10
...@@ -144,4 +145,6 @@ def test_sharding_api(): ...@@ -144,4 +145,6 @@ def test_sharding_api():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
test_sharding_api() test_sharding_api()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import shutil
import tempfile
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model
epoch = 10
paddle.seed(2022)
np.random.seed(2022)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
batch_size = 100
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.Momentum(
parameters=[{
"params": list(model.parameters())
}] if opt_group else list(model.parameters()),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model, shard_level, use_pure_fp16, output_dir):
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
model, optimizer, scaler = group_sharded_parallel(
model=model, optimizer=optimizer, level=shard_level, scaler=scaler)
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if not use_pure_fp16:
avg_loss.backward()
optimizer.step()
else:
scaler.scale(avg_loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
save_group_sharded_model(model, output=output_dir, optimizer=optimizer)
return model.parameters()
def test_sharding_api():
paddle.distributed.init_parallel_env()
mlp, mlp1, mlp2 = MLP(), MLP(), MLP()
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
output_dir = tempfile.mkdtemp()
# fp16
stage2_params = train_mlp(
mlp1, shard_level="os_g", use_pure_fp16=True, output_dir=output_dir)
stage3_params = train_mlp(
mlp2, shard_level="p_g_os", use_pure_fp16=True, output_dir=output_dir)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-4,
atol=1e-3)
shutil.rmtree(output_dir)
if __name__ == '__main__':
with _test_eager_guard():
test_sharding_api()
# -*- coding: UTF-8 -*-
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import numpy as np
import argparse
import tempfile
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2
seed = 2022
epoch = 2
linear_size = 1000
np.random.seed(seed)
paddle.seed(seed)
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(
parameters=[{
"params": model.parameters(),
}] if opt_group else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model,
sharding_stage,
batch_size=100,
use_pure_fp16=False,
accumulate_grad=False,
opt_group=False,
save_model=False,
test_minimize=False):
if sharding_stage != "dp":
group = paddle.distributed.new_group([0, 1], backend="nccl")
if opt_group:
optimizer = optimizer_setting(
model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if sharding_stage == 2:
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list, optim=optimizer, group=group)
model = GroupShardedStage2(
model, optimizer, group=group, buffer_max_size=2**21)
else:
model = paddle.DataParallel(model)
# check optimizer.minimize() error
if test_minimize:
try:
optimizer.minimize()
except:
print(
"====== Find sharding_stage2_optimizer.minimize() error ======")
return
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
if sharding_stage == 2:
model.to(device="gpu")
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
out = model(img)
loss = paddle.nn.functional.cross_entropy(input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
avg_loss.backward()
if not accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if save_model:
return model, optimizer
return model.parameters()
def test_dp_stage2():
paddle.distributed.init_parallel_env()
mlp = MLP()
state_dict = mlp.state_dict()
mlp1 = MLP()
mlp2 = MLP()
mlp3 = MLP()
mlp4 = MLP()
mlp5 = MLP()
mlp6 = MLP()
mlp7 = MLP()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
mlp7.set_state_dict(state_dict)
# DP VS stage2
dp_params = train_mlp(
mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False)
stage2_params = train_mlp(
mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6)
# stage2 accumulate grad
stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True)
stage2_accumulate_grad = train_mlp(
mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage2_accumulate_grad[i].numpy(),
rtol=1e-5,
atol=1e-5)
# stage2 param list VS param group
stage2_params = train_mlp(
mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6)
# save/load model
output_dir = tempfile.mkdtemp()
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage2, optimizer_stage2 = train_mlp(
mlp6,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False,
save_model=True)
paddle.save(model_stage2.state_dict(), model_file)
paddle.save(optimizer_stage2.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage2.set_state_dict(m_state_dict)
optimizer_stage2.set_state_dict(opt_state_dict)
shutil.rmtree(output_dir)
# check optimizer.minimize() error
train_mlp(mlp7, sharding_stage=2, test_minimize=True)
return
if __name__ == '__main__':
with _test_eager_guard():
test_dp_stage2()
# -*- coding: UTF-8 -*-
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler
from dygraph_group_sharded_stage2 import MLP, reader_decorator, optimizer_setting
seed = 2021
epoch = 2
batch_size = 32
linear_size = 1000
np.random.seed(seed)
paddle.seed(seed)
def train_mlp(model, offload=False):
optimizer = optimizer_setting(model=model, use_pure_fp16=True)
model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=1024)
scaler = GroupShardedScaler(scaler)
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list, optim=optimizer, offload=offload)
model = GroupShardedStage2(model, optimizer, buffer_max_size=2**21)
train_reader = paddle.batch(
reader_decorator(linear_size), batch_size=batch_size, drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
scaler.scale(avg_loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
for dtype in optimizer.param_storages:
for dst_rank, param_storage in optimizer.param_storages[dtype].items():
param_storage.to(device="gpu", dtype=dtype)
return model.parameters()
def test_sharding_stage2_offload():
paddle.distributed.init_parallel_env()
mlp = MLP(linear_size)
mlp_offload = MLP(linear_size)
mlp_offload.set_state_dict(mlp.state_dict())
mlp_params = train_mlp(mlp, offload=False)
mlp_offload_params = train_mlp(mlp_offload, offload=True)
for i in range(len(mlp_params)):
np.testing.assert_allclose(
mlp_params[i].numpy(),
mlp_offload_params[i].numpy(),
rtol=5e-3,
atol=5e-3)
return
if __name__ == '__main__':
with _test_eager_guard():
test_sharding_stage2_offload()
# -*- coding: UTF-8 -*-
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import numpy as np
import argparse
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler
epoch = 10
paddle.seed(2022)
np.random.seed(2022)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.Momentum(
parameters=[{
"params": list(model.parameters())
}] if opt_group else list(model.parameters()),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model,
sharding_stage,
use_pure_fp16=False,
accumulate_grad=False,
batch_size=100,
opt_group=False,
sync_comm=False,
test_minimize=False,
save_model=False):
group = paddle.distributed.new_group([0, 1])
if opt_group:
optimizer = optimizer_setting(
model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if use_pure_fp16:
model = paddle.amp.decorate(
models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = GroupShardedScaler(scaler)
if sharding_stage == 2:
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list, optim=optimizer, group=group)
model = GroupShardedStage2(
model, optimizer, group=group, buffer_max_size=2**21)
elif sharding_stage == 3:
model = GroupShardedStage3(
model,
optimizer=optimizer,
group=group,
sync_comm=sync_comm,
segment_size=2**15)
# check optimizer.minimize() error
if test_minimize:
try:
optimizer.minimize()
except:
print(
"====== Find sharding_stage3_optimizer.minimize() error ======")
return
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
if not use_pure_fp16:
avg_loss.backward()
else:
scaler.scale(avg_loss).backward()
if not accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if sharding_stage == 3:
model.get_all_parameters()
if save_model:
return model, optimizer
return model.parameters()
def test_stage2_stage3():
paddle.distributed.init_parallel_env()
mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = MLP(
), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP()
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
mlp7.set_state_dict(state_dict)
mlp8.set_state_dict(state_dict)
mlp9.set_state_dict(state_dict)
mlp10.set_state_dict(state_dict)
# fp32
stage2_params = train_mlp(
mlp1, sharding_stage=2, use_pure_fp16=False, opt_group=False)
stage3_params = train_mlp(
mlp2, sharding_stage=3, use_pure_fp16=False, opt_group=False)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-6,
atol=1e-6)
# fp32 accumulate grad
stage3_params = train_mlp(
mlp3,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
opt_group=True)
stage3_params_add = train_mlp(
mlp4,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
batch_size=20,
opt_group=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_add[i].numpy(),
rtol=1e-6,
atol=1e-4)
# fp16
stage2_params = train_mlp(
mlp5, sharding_stage=2, use_pure_fp16=True, opt_group=False)
stage3_params = train_mlp(
mlp6, sharding_stage=3, use_pure_fp16=True, opt_group=False)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-4,
atol=1e-3)
# fp16 sync_comm
stage3_params = train_mlp(
mlp7, sharding_stage=3, use_pure_fp16=True, opt_group=False)
stage3_params_re = train_mlp(
mlp8,
sharding_stage=3,
use_pure_fp16=True,
opt_group=False,
sync_comm=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(), stage3_params_re[i].numpy(), rtol=1e-6)
# save/load model
output_dir = tempfile.mkdtemp()
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage3, optimizer_stage3 = train_mlp(
mlp9,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
save_model=True)
paddle.save(model_stage3.state_dict(), model_file)
paddle.save(optimizer_stage3.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage3.set_state_dict(m_state_dict)
optimizer_stage3.set_state_dict(opt_state_dict)
shutil.rmtree(output_dir)
# check optimizer.minimize() error
train_mlp(
mlp10,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
test_minimize=True)
if __name__ == '__main__':
with _test_eager_guard():
test_stage2_stage3()
# -*- coding: UTF-8 -*-
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import ast
import time
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet
from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler
epoch = 10
paddle.seed(2022)
np.random.seed(2022)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super(MLP, self).__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(
parameters=[{
"params": model.parameters()
}] if opt_group else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16)
return optimizer
def train_mlp(model,
use_pure_fp16=False,
accumulate_grad=False,
offload=False,
batch_size=100,
convert2cpu=False):
group = paddle.distributed.new_group([0, 1])
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if use_pure_fp16:
model = paddle.amp.decorate(
models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = GroupShardedScaler(scaler)
model = GroupShardedStage3(
model,
optimizer=optimizer,
group=group,
offload=offload,
segment_size=2**15)
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if accumulate_grad:
avg_loss = avg_loss / 5
if not use_pure_fp16:
avg_loss.backward()
else:
scaler.scale(avg_loss).backward()
if not accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if not convert2cpu:
model.get_all_parameters()
else:
model.get_all_parameters(convert2cpu)
return model.parameters()
def test_stage3_offload():
paddle.distributed.init_parallel_env()
mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6 = MLP(), MLP(), MLP(), MLP(), MLP(
), MLP(), MLP()
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
# fp32 offload
stage3_params = train_mlp(mlp1, use_pure_fp16=False)
stage3_params_offload = train_mlp(mlp2, use_pure_fp16=False, offload=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-6,
atol=1e-8)
# fp16 offload
stage3_params = train_mlp(mlp3, use_pure_fp16=True)
stage3_params_offload = train_mlp(mlp4, use_pure_fp16=True, offload=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-2,
atol=1e-2)
# fp32 accumulate grad offload
stage3_params = train_mlp(
mlp5, use_pure_fp16=False, batch_size=20, accumulate_grad=True)
stage3_params_offload = train_mlp(
mlp6,
use_pure_fp16=False,
accumulate_grad=True,
offload=True,
batch_size=20,
convert2cpu=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-6,
atol=1e-8)
return
if __name__ == '__main__':
with _test_eager_guard():
test_stage3_offload()
...@@ -23,6 +23,7 @@ import paddle ...@@ -23,6 +23,7 @@ import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.utils.internal_storage import GradStorage from paddle.distributed.fleet.utils.internal_storage import GradStorage
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
...@@ -138,4 +139,6 @@ def train_mlp(): ...@@ -138,4 +139,6 @@ def train_mlp():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
train_mlp() train_mlp()
...@@ -26,6 +26,7 @@ import paddle.fluid as fluid ...@@ -26,6 +26,7 @@ import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph import nn from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2
...@@ -222,4 +223,6 @@ def test_dp_stage2(): ...@@ -222,4 +223,6 @@ def test_dp_stage2():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
test_dp_stage2() test_dp_stage2()
...@@ -23,6 +23,7 @@ import paddle.fluid as fluid ...@@ -23,6 +23,7 @@ import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph import nn from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2
...@@ -106,4 +107,6 @@ def test_sharding_stage2_offload(): ...@@ -106,4 +107,6 @@ def test_sharding_stage2_offload():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
test_sharding_stage2_offload() test_sharding_stage2_offload()
...@@ -26,6 +26,7 @@ import paddle.fluid as fluid ...@@ -26,6 +26,7 @@ import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph import nn from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2
...@@ -274,4 +275,6 @@ def test_stage2_stage3(): ...@@ -274,4 +275,6 @@ def test_stage2_stage3():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
test_stage2_stage3() test_stage2_stage3()
...@@ -23,6 +23,7 @@ import paddle.fluid as fluid ...@@ -23,6 +23,7 @@ import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.nn import Linear
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph import nn from paddle.fluid.dygraph import nn
from paddle.fluid.framework import _test_eager_guard
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler
...@@ -196,4 +197,6 @@ def test_stage3_offload(): ...@@ -196,4 +197,6 @@ def test_stage3_offload():
if __name__ == '__main__': if __name__ == '__main__':
with _test_eager_guard():
pass
test_stage3_offload() test_stage3_offload()
...@@ -25,6 +25,7 @@ class TestDygraphGroupSharded(TestMultipleGpus): ...@@ -25,6 +25,7 @@ class TestDygraphGroupSharded(TestMultipleGpus):
# check group sharded logic as well as the accuracy with single mode # check group sharded logic as well as the accuracy with single mode
def test_dygraph_group_sharded(self): def test_dygraph_group_sharded(self):
self.run_mnist_2gpu('dygraph_group_sharded_api.py') self.run_mnist_2gpu('dygraph_group_sharded_api.py')
self.run_mnist_2gpu('dygraph_group_sharded_api_eager.py')
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -24,9 +24,11 @@ class TestDygraphShardingStage2(TestMultipleGpus): ...@@ -24,9 +24,11 @@ class TestDygraphShardingStage2(TestMultipleGpus):
# check sharding logic as well as the accuracy with single mode # check sharding logic as well as the accuracy with single mode
def test_dygraph_sharding_stage2(self): def test_dygraph_sharding_stage2(self):
self.run_mnist_2gpu('dygraph_group_sharded_stage2.py')
self.run_mnist_2gpu('dygraph_sharding_stage2.py') self.run_mnist_2gpu('dygraph_sharding_stage2.py')
def test_dygraph_sharding_stage2_offload(self): def test_dygraph_sharding_stage2_offload(self):
self.run_mnist_2gpu('dygraph_group_sharded_stage2_offload.py')
self.run_mnist_2gpu('dygraph_sharding_stage2_offload.py') self.run_mnist_2gpu('dygraph_sharding_stage2_offload.py')
......
...@@ -24,9 +24,11 @@ class TestDygraphShardingStage3(TestMultipleGpus): ...@@ -24,9 +24,11 @@ class TestDygraphShardingStage3(TestMultipleGpus):
# check sharding logic as well as the accuracy with single mode # check sharding logic as well as the accuracy with single mode
def test_dygraph_sharding_stage3(self): def test_dygraph_sharding_stage3(self):
self.run_mnist_2gpu('dygraph_group_sharded_stage3.py')
self.run_mnist_2gpu('dygraph_sharding_stage3.py') self.run_mnist_2gpu('dygraph_sharding_stage3.py')
def test_dygraph_sharding_stage3_offload(self): def test_dygraph_sharding_stage3_offload(self):
self.run_mnist_2gpu('dygraph_group_sharded_stage3_offload.py')
self.run_mnist_2gpu('dygraph_sharding_stage3_offload.py') self.run_mnist_2gpu('dygraph_sharding_stage3_offload.py')
......
...@@ -677,7 +677,7 @@ class EagerVariablePropertiesAndMethodsTestCase(unittest.TestCase): ...@@ -677,7 +677,7 @@ class EagerVariablePropertiesAndMethodsTestCase(unittest.TestCase):
tensor2 = None tensor2 = None
tensor = paddle.to_tensor(arr, core.VarDesc.VarType.FP32, tensor = paddle.to_tensor(arr, core.VarDesc.VarType.FP32,
core.CPUPlace()) core.CPUPlace())
tensor3 = core.eager.Tensor() tensor3 = core.eager.Tensor(value=tensor, place=core.CPUPlace())
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
tensor2 = paddle.to_tensor(arr2, core.VarDesc.VarType.FP32, tensor2 = paddle.to_tensor(arr2, core.VarDesc.VarType.FP32,
core.CUDAPlace(0)) core.CUDAPlace(0))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册