未验证 提交 2bbdc47a 编写于 作者: W wanghuancoder 提交者: GitHub

delete old dygraph sharding (#49334)

* delete old dygraph sharding
上级 2ca3d3f7
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The file has been adapted from fairscale file:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py
# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
# We retain the following license from the original files:
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import logging
from collections import OrderedDict
import numpy as np
import paddle
import paddle.distributed as dist
from paddle.distributed.collective import _get_global_group, new_group
from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.framework import core
from paddle.optimizer import Optimizer
from ...meta_parallel.sharding.sharding_utils import (
ShardingClipGrad,
Type,
device_guard,
)
from ...utils.internal_storage import GradStorage, ParamStorage
# CUDA alignment 256 bytes, cpu alignment 4096 bytes
alignment = {"gpu": 256, "cpu": 4096}
align = {
Type.fp16.value: 2,
Type.fp32.value: 4,
}
class ShardingOptimizerStage2(Optimizer):
"""
A wrapper for Sharding Stage2 Optimizer in Dygraph.
.. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer.
.. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes:
# 1. Unified memory for parameters and parameters.grad to InternalStorage.
# 2. Support the segmentation of optimizer parameters and partial updating of parameters.
# 3. Dynamically adjust training parameters and models.
# 4. Support offload function.
# 5. Support the establishment of independent communication groups.
# 6. Broadcast_fp16 is not supported now.
def __init__(
self,
params,
optim,
group=None,
offload=False,
device="gpu",
pertrain_sync_models=True,
**kw
):
super().__init__(optim._learning_rate, params, kw)
# Segmentation information
self._dtype_rank_params = (
OrderedDict()
) # {dtype:[param1,param2]} device, rank, params
self._param2rank = {}
self.__segment_params = []
self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}}
self._param2align = {} # {param.name: align}
# Default information
self._optim_defaults = kw
self._optim = optim
assert hasattr(
self._optim, "_master_weights"
), "Must use optimizer with _master_weights attribute"
self._local_params = params
self._default_device = device
self._pfp16 = (
len(
list(
filter(
lambda x: x.trainable and x.dtype == Type.fp16.value,
self._local_params,
)
)
)
> 0
)
self.group = (
new_group(_get_global_group().ranks) if group is None else group
)
self.world_size = self.group.nranks
self.rank = self.group.rank
self._global_root_rank = self.group.ranks[0]
# Synchronous all ranks models
if pertrain_sync_models:
self._sync_params_and_buffers()
self.param_storages = {} # {dtype: {rank: InternalStorage}}
if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm):
logging.warning(
"While using ClipGradByGlobalNorm in ShardingOptimizer, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = ShardingClipGrad(
self._optim._grad_clip, paddle.get_device(), self.group
)
if self._optim._parameter_list and isinstance(
self._optim._parameter_list[0], dict
):
for item in self._optim._param_groups:
if "grad_clip" in item.keys():
item["grad_clip"] = ShardingClipGrad(
self._optim._grad_clip,
paddle.get_device(),
self.group,
)
if offload:
assert (
self._pfp16
), "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16"
self.offload = offload # Using for offload
self.offload_device = "cpu"
self.offload_buffer_size = 0
self.offload_param2align = {}
self.offload_params = None
self.offload_grads = None
self._master_params = {}
# Update optimizer parameters and adjust parameter storage and use according to rank.
self._update_opt_status()
@paddle.autograd.no_grad()
def _sync_params_and_buffers(self):
"""
Sync all model states for all ranks
"""
for p in self._local_params:
dist.broadcast(
p, src=self._global_root_rank, group=self.group, sync_op=True
)
# Multi stream operation will be supported later
dist.wait(tensor=p, group=self.group, use_calc_stream=True)
def _generate_master_params(self, trainable_params):
if self.offload:
for param in trainable_params:
if param.name not in self._master_params.keys():
self._master_params[param.name] = core.VarBase(
name=param.name,
value=param.cast(dtype=Type.fp32.value).numpy(),
place=core.CPUPlace(),
stop_gradient=param.stop_gradient,
)
else:
for param in trainable_params:
if param.dtype == Type.fp16.value:
self._optim._master_weights[param.name] = paddle.cast(
param, Type.fp32.value
)
def _update_opt_status(self):
"""Update optimizer status and parameter storage information, and special functions to be developed."""
# func 1
self._integration_params()
# fun 2 TODO
# Segement helpers
def _segment_params(self):
"""
Divide all optimizer parameters equally into rank.
"""
if len(self.__segment_params) == 0:
self.__segment_params, param_lists = [
[] for _ in range(self.world_size)
], [[] for _ in range(self.world_size)]
sizes = [0] * self.world_size
for param in self._local_params:
# Add this param to rank with smallest size.
rank = sizes.index(min(sizes))
param_lists[rank].append(param)
# Statistical real numels
sizes[rank] += np.prod(param.shape) if param.trainable else 0
for rank, params in enumerate(param_lists):
self.__segment_params[rank].extend(params)
return self.__segment_params
@property
def local_params(self):
return self._local_params
@property
def param2rank(self):
"""Map the params to the rank which owns them"""
if len(self._param2rank) == 0:
for rank, params in enumerate(self._segment_params()):
for param in params:
self._param2rank[param.name] = rank
return self._param2rank
@property
def dtype_rank_params(self):
"""
Divide the parameters into groups according to rank and dtype.
"""
if len(self._dtype_rank_params) == 0:
# Assign the parameters of each rank according to the type
for param in self._local_params:
if param.dtype not in self._dtype_rank_params.keys():
self._dtype_rank_params[param.dtype] = [
[] for _ in range(self.world_size)
]
self._dtype_rank_params[param.dtype][
self.param2rank[param.name]
].append(param)
# Sort per rank params by size
for dtype in self._dtype_rank_params.keys():
for rank_params in self._dtype_rank_params[dtype]:
rank_params.sort(key=lambda x: np.prod(x.shape))
return self._dtype_rank_params
@property
def rank_buffer_size(self):
"""
Count the memory size of the parameters corresponding to rank under the corresponding dtype.
"""
# CUDA alignment 256 bytes
if len(self._rank_buffer_size) == 0:
for dtype in self.dtype_rank_params.keys():
if dtype not in self._rank_buffer_size.keys():
self._rank_buffer_size[dtype] = {}
for dst_rank, per_rank_params in enumerate(
self.dtype_rank_params[dtype]
):
if dst_rank not in self._rank_buffer_size[dtype].keys():
self._rank_buffer_size[dtype][dst_rank] = 0
for param in per_rank_params:
if not param.trainable:
continue
size = np.prod(param.shape) * align[dtype]
remaining = size % alignment[self._default_device]
ali = (
0
if remaining == 0
else alignment[self._default_device] - remaining
)
align_ = ali // align[dtype]
self._rank_buffer_size[dtype][dst_rank] += (
np.prod(param.shape) + align_
)
self._param2align[param.name] = align_
return self._rank_buffer_size
def _integration_params(self):
"""
Integrate the parameters into a continuous memory according to rank, and support the update of training parameters.
"""
for dtype, per_rank_params in self.dtype_rank_params.items():
if dtype not in self.param_storages.keys():
self.param_storages[dtype] = {}
for dst_rank, params in enumerate(per_rank_params):
if len(params) > 0:
# Merge all the trainable params in a single InternalStorage
trainable_params = list(
filter(lambda x: x.trainable, params)
)
if self._pfp16 and dst_rank == self.rank:
self._generate_master_params(trainable_params)
if trainable_params:
param_storage = ParamStorage(
size=self.rank_buffer_size[dtype][dst_rank],
dtype=dtype,
device=self._default_device,
)
param_storage.add_rank_params(
trainable_params, self._param2align
)
self.param_storages[dtype][dst_rank] = param_storage
# Clear the InternalStorage keys which are not in use anymore
dtype_in_use = list(self.dtype_rank_params.keys())
dtype_to_pop = list(
filter(lambda x: x not in dtype_in_use, self.param_storages.keys())
)
for d in dtype_to_pop:
self.param_storages.pop(d)
if self.offload:
self._optim._master_weights = self._master_params
cpu_master_params = [p for p in self._master_params.values()]
for param in cpu_master_params:
size = np.prod(param.shape) * align[Type.fp32.value]
remaining = size % alignment[self.offload_device]
ali = (
0
if remaining == 0
else alignment[self.offload_device] - remaining
)
align_ = ali // align[Type.fp32.value]
self.offload_buffer_size += np.prod(param.shape) + align_
self.offload_param2align[param.name] = align_
if cpu_master_params:
with device_guard(self.rank, self.offload_device):
self.offload_params = ParamStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device,
)
self.offload_params.add_rank_params(
cpu_master_params, self.offload_param2align, False
)
self.offload_params.buffer.stop_gradient = False
self.offload_grads = GradStorage(
size=self.offload_buffer_size,
dtype=Type.fp32.value,
device=self.offload_device,
destination=self.rank,
parm2align=self.offload_param2align,
convert_cpu=True,
)
for p in cpu_master_params:
self.offload_grads.add_grad(
p, self.offload_param2align[p.name]
)
self._optim._master_weights[
self.offload_params.buffer.name
] = self.offload_params.buffer
def _offload_acc_grad(self, param_name, grad_fp32_cpu):
"""accumulate grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
if param_name in self._master_params.keys():
if self._master_params[param_name].grad is None:
self._master_params[param_name]._copy_gradient_from(
grad_fp32_cpu
)
else:
self._master_params[param_name].grad.add_(grad_fp32_cpu)
self.offload_params.buffer._copy_gradient_from(
self.offload_grads.buffer
)
def _offload_scale_grad(self, scale_size):
"""scale grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
self.offload_grads.buffer.scale_(scale=scale_size)
def _offload_clear_grad(self):
"""clear grads with offload strategy"""
with device_guard(self.rank, self.offload_device):
self.offload_grads.buffer.zero_()
def step(self):
"""
A wrapper for Optimizer's step function to finish the update operation of the optimizer.
"""
if self.offload:
params_list = [self.offload_params.buffer]
# TODO(Baibaifan): Offload will support param_groups later
if not isinstance(self._optim._param_groups[0], dict):
self._optim._parameter_list = params_list
self._optim._param_groups = params_list
# Run the optimizer of the current rank step
if self.offload:
with device_guard(device=self.offload_device):
self._optim.step()
dev_id = int(paddle.get_device().split(":")[1])
for param in self._local_params:
if param.name in self._master_params.keys():
param.set_value(
self._master_params[param.name]
.cuda(dev_id)
.cast(dtype=param.dtype)
)
else:
self._optim.step()
# Synchronize all the updated shards in between the ranks
self._broadcast_params()
def minimize(self):
raise RuntimeError(
"optimizer.minimize() not support now, please use optimizer.step()"
)
def set_state_dict(self, state_dict):
self._optim.set_state_dict(state_dict)
def state_dict(self):
return self._optim.state_dict()
def _clear_cache(self):
self.__segment_params.clear()
self._dtype_rank_params.clear()
self._param2rank.clear()
@paddle.autograd.no_grad()
def _broadcast_params(self):
"""Broadcast the parameters of the current rank to each rank"""
assert self._default_device == "gpu", "Only supported gpu"
# Exchange all the shards with the other ranks
for dtype_per_rank in self.param_storages.values():
for dst_rank, internal_storage in dtype_per_rank.items():
dist.broadcast(
tensor=internal_storage.buffer,
src=self.group.ranks[dst_rank],
group=self.group,
sync_op=True,
)
# Multi stream operation will be supported later
dist.wait(
tensor=internal_storage.buffer,
group=self.group,
use_calc_stream=True,
)
......@@ -11,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .sharding_utils import Type, device_guard
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The file has been adapted from fairscale file:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py
# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
# We retain the following license from the original files:
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import logging
from collections import deque
from functools import reduce
from itertools import chain
from types import MethodType
import numpy as np
import paddle
import paddle.distributed as dist
from paddle import nn
from paddle.distributed import collective as collective
from paddle.distributed.collective import _get_global_group
from ...meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from ...utils.internal_storage import GradStorage
from .sharding_utils import Taskflow, Type
def _trainable(param):
return param.trainable
class ShardingStage2(nn.Layer):
"""
A wrapper for Sharding Stage2 Layer in Dygraph.
.. warning: ShardingStage2 encapsulates the layer strategy and integrates it into the nn.Layer.
.. ZeRO: https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes::
# 1. Unified memory for param and param.grad to InternalStorage.
# 2. Divide param.grad according to rank to centrally apply for and release GPU memory.
# 3. Dynamically adjust training parameters and models.
# 4. Support offload function.
# 5. Support the establishment of independent communication groups.
def __init__(
self,
layer,
sharding_optimizer,
group=None,
sync_buffers=False,
buffer_max_size=2**23, # 8MB
auto_refresh_trainable=True,
device="gpu",
):
super().__init__()
# training options
self._layer = layer
self._sharding_optimizers = (
[sharding_optimizer]
if not isinstance(sharding_optimizer, list)
else sharding_optimizer
)
assert all(
list(
map(
lambda opt: isinstance(opt, ShardingOptimizerStage2),
self._sharding_optimizers,
)
)
), "Please use ShardingOptimizerStage2 optimizer"
self._sync_buffers = sync_buffers
self._auto_refresh_trainable = auto_refresh_trainable
# Communication related attributes
self._group = (
collective.new_group(_get_global_group().ranks)
if group is None
else group
)
self._world_size_scaling = 1.0 / self._group.nranks
assert (
self._group.nranks > 1
), "Training must be distributed, ranks must be greater than 1"
self._rank = self._group.rank
self._global_root_rank = self._group.ranks[
0
] # picking rank 0 as the reference
self._default_device = device
# Global statistical parameters
self._all_params = list(
chain(*[optim.local_params for optim in self._sharding_optimizers])
)
self._trainable_params = []
self._grad_reduced = []
self._trainable_param2rank = {}
self._trainable_param2align = {}
self._trainable_mask = list(map(_trainable, self._all_params))
self._param_grads = []
# Set grad storage size & Display param sizes and model sizes
model_size = sum(
[np.prod(p.shape) for p in self._layer.parameters()]
).item()
assert buffer_max_size >= 0, "buffer_max_size must be GE than 0."
self._buffer_max_size = self._rank_buffer_size(
buffer_max_size, model_size
)
self._use_grad_storage = buffer_max_size > 0
self._grad_storages = {} # {dtype: {rank: GradStorage}}
self._has_grad_storage = []
self._grad_storage_list = []
# Offload
# TODO(haohongxiang): Now it's not be supported for multi-optimizers using Offload strategy
self._offload_optims = list(
filter(lambda optim: optim.offload, self._sharding_optimizers)
)
if len(self._offload_optims) > 0:
assert (
len(self._sharding_optimizers) == 1
), "Only support offload strategy for single optimizer"
self._offload = self._sharding_optimizers[0].offload
self._offload_device = "cpu"
# Set backward pass hooks
self._bw_hooks = []
# Set tasks flow
self._tasks_flow = deque()
# Define optimizer step and clear_grad
self._redefine_opt_step()
self._redefine_opt_clear()
def forward(self, *inputs, **kwargs):
"""
A wrapper for Sharding Stage2 layer.
- Fresh trainable params or rebuild grad storage
- Sync layer's buffer params
- Clear all flags states
- Forward for origin layers
"""
# Whether to need to reset trainable parameters
needs_fresh = len(self._bw_hooks) == 0 and self.training
if self._auto_refresh_trainable:
needs_fresh |= self._detect_train_change()
# Front hook
self._init_internal_storage(needs_fresh)
# Sync layer's buffers state
if self._sync_buffers:
self.__sync_buffers()
# Normal FW on the base model
fw = self._layer(*inputs, **kwargs)
return fw
def set_state_dict(self, state_dict, use_structured_name=True):
self._layer.set_state_dict(
state_dict, use_structured_name=use_structured_name
)
def state_dict(
self,
destination=None,
include_sublayers=True,
structured_name_prefix="",
):
return self._layer.state_dict(
destination=None, include_sublayers=True, structured_name_prefix=""
)
def _clear_gradients(self):
"""
Set zero to the gradient of the optimizer's current rank trainable parameters.
"""
# Release grad storages
for dtype in self._grad_storages.keys():
if (
not self._offload
and self._rank in self._grad_storages[dtype].keys()
):
self._grad_storages[dtype][self._rank].buffer.zero_()
# Release grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.clear_gradient()
# Release grads of master params with offload strategy
if self._offload:
self._sharding_optimizers[0]._offload_clear_grad()
def _grad_scale(self):
"""
Before the gradient accumulation, scale the gradient.
"""
# Scale grad storages
for dtype in self._grad_storages.keys():
if (
not self._offload
and self._rank in self._grad_storages[dtype].keys()
):
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling
)
# Scale grads of params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)
# Scale grads of master params with offload strategy
if self._offload:
self._sharding_optimizers[0]._offload_scale_grad(
self._world_size_scaling
)
def _init_internal_storage(self, needs_fresh):
"""
Judge Fresh trainable params or rebuild grad storage.
"""
if needs_fresh:
self._fresh_trainable()
else:
self._build_grad_storages()
# Clear all flags state
self._clear_counters()
def to(self, device=None, dtype=None, blocking=True):
"""
Synchronously or asynchronously convert the data type of the layer, the device is not supported now.
"""
assert isinstance(device, str), "Device must be type str"
assert (
device == self._default_device
), "New devices are not supported, because of the optimizer state is not sync"
self._layer.to(device=device, dtype=dtype, blocking=blocking)
# Re-build the buckets, hooks, etc..
self._fresh_trainable()
def _fresh_trainable(self):
"""Whether to update training parameters."""
# Make sure that this is not done while gradients are waiting to be reduced (if no_sync context for instance)
if reduce(lambda x, y: x or y, self._grad_reduced, False):
logging.warning("Grads waiting to be reduced.")
self._trainable_params = list(
filter(lambda x: x.trainable, self._all_params)
)
self._trainable_params.sort(key=lambda x: np.prod(x.shape))
self._trainable_param2rank = {}
for optim in self._sharding_optimizers:
# Need to be wrappered for Sharding Stage2 Optimizer
if len(optim.param_storages.keys()) == 0:
optim.update_opt_status()
# Get the parameters split by the optimizer according to rank
for (
per_rank_params
) in (
optim.dtype_rank_params.values()
): # all the params from all ranks
for params in per_rank_params:
for param in filter(lambda x: x.trainable, params):
self._trainable_param2rank[
param.name
] = optim.param2rank[param.name]
self._trainable_param2align[
param.name
] = optim._param2align[param.name]
self._setup_use_grad_storage()
# wait next func hook support
self._setup_backward_hooks()
@paddle.autograd.no_grad()
def __sync_buffers(self):
"""
Sync all the param buffers from all ranks (exp: batch norm statistics).
"""
for buffer in self._layer.buffers(include_sublayers=True):
dist.broadcast(
buffer, self._global_root_rank, self._group, sync_op=True
)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)
def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
try:
return super().__getattr__(name)
except AttributeError:
return getattr(self._layer, name)
@paddle.autograd.no_grad()
def _clear_counters(self):
"""Reset all the grad reduce and call counters."""
if self.training:
self._grad_reduced = [True for _ in self._trainable_params]
if self._use_grad_storage:
for grad_storage in self._grad_storage_list:
grad_storage.reset_checked_in()
def _get_reduce_fn(self, index, param, dst_rank):
"""
There are two ways to reduce gradient.
- 1. Do not use self._use_grad_storage or exceeded buffer_max_size will be reduced separately.
- 2. Use grad_storage Reduce the storage to get the full gradient from different ranks.
"""
if not self._use_grad_storage or not self._has_grad_storage[index]:
# Direct reduction
@paddle.autograd.no_grad()
def reduce(*_):
# Skip gradient reduction, do not change status information
if self._grad_reduced[index]:
assert (
param.grad is not None
), "Parameter gradient cannot be None"
# Change reduce information
self._grad_reduced[index] = False
# Clear the gradient that does not belong to the current rank through the callback function
def cleanup():
if dst_rank != self._rank:
param.clear_gradient(False)
elif self._offload:
self._sharding_optimizers[0]._offload_acc_grad(
param.name,
param.grad.cast(dtype=Type.fp32.value).cpu(),
)
param.clear_gradient(False)
# Synchronize the reduce parameter gradient
self._tasks_flow.append(
Taskflow(
task=dist.reduce(
tensor=param.grad,
dst=self._group.ranks[dst_rank],
group=self._group,
sync_op=True,
),
callback=cleanup,
)
)
# Multi stream operation will be supported later
dist.wait(
tensor=param.grad,
group=self._group,
use_calc_stream=True,
)
# Clear the task flow and trigger callback to clear the redundant gradient
self._clear_task_flow()
else:
# Buffer reduction
@paddle.autograd.no_grad()
def reduce(*_):
# Skip gradient reduction, do not change status information
if self._grad_reduced[index]:
assert (
param.grad is not None
), "Parameter gradient cannot be None"
# Change reduce information
self._grad_reduced[index] = False
grad_storage = self._grad_storages[param.dtype][dst_rank]
grad_storage.params_checked_in += 1
if grad_storage.all_checked_in:
assert grad_storage.buffer is not None
# Clearing up the grad_storage buffer
def cleanup():
if dst_rank != self._rank:
for p in grad_storage._params:
p.clear_gradient(False)
p._gradient_set_empty(False)
grad_storage.buffer.value().get_tensor()._clear()
elif self._offload:
grad_storage.to(device=self._offload_device)
for p in grad_storage._params:
self._sharding_optimizers[
0
]._offload_acc_grad(
p.name,
p.grad.cast(dtype=Type.fp32.value),
)
p.clear_gradient(False)
p._gradient_set_empty(False)
grad_storage._device = self._default_device
grad_storage.buffer.value().get_tensor()._clear()
# Reduce the bucket
grad_storage.sent = True
self._tasks_flow.append(
Taskflow(
task=dist.reduce(
tensor=grad_storage.buffer,
dst=self._group.ranks[
grad_storage.destination
],
group=self._group,
sync_op=True,
),
callback=cleanup,
)
)
# Multi stream operation will be supported later
dist.wait(
tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True,
)
# Clear the task flow and trigger callback to clear the redundant gradient
self._clear_task_flow()
return reduce
def _setup_backward_hooks(self):
"""
Set the backward hook to synchronize the gradients of all rank by reduce group ranks.
"""
# Remove previous backward hooks
while len(self._bw_hooks) > 0:
self._bw_hooks.pop().remove()
# Go through the parameters, attach the hook
if not self.training:
return
for index, param in enumerate(self._trainable_params):
dst_rank = self._trainable_param2rank[param.name]
reduce_function = self._get_reduce_fn(index, param, dst_rank)
self._bw_hooks.append(
param._register_backward_hook(reduce_function)
)
def _setup_use_grad_storage(self):
"""
Integrate the parameters gradient into a continuous memory according to rank, and support the update of training parameters.
"""
# According to parameters's numel sort, allocate memory of parameter gradient to continuous memory according to rank
self._grad_storages = {}
self._has_grad_storage = [False for _ in self._trainable_params]
for index, param in enumerate(self._trainable_params):
dst_rank = self._trainable_param2rank[param.name]
if param.dtype not in self._grad_storages.keys():
self._grad_storages[param.dtype] = {}
if dst_rank not in self._grad_storages[param.dtype].keys():
self._grad_storages[param.dtype][dst_rank] = GradStorage(
self._buffer_max_size[param.dtype],
dtype=param.dtype,
device=self._default_device,
destination=dst_rank,
parm2align=self._trainable_param2align,
)
# Criteria to decide whether this parameter is to be put in GradStorage
if self._grad_storages[param.dtype][dst_rank].can_add_grad_view(
param, self._trainable_param2align[param.name]
):
self._grad_storages[param.dtype][dst_rank].add_grad(
param, self._trainable_param2align[param.name]
)
self._has_grad_storage[index] = True
else:
self._param_grads.append(param.name)
print(
"Can not add param: {}, param's shape: {}, param align: {}, grad_storages fill: {}, ".format(
param.name,
param.shape,
self._trainable_param2align[param.name],
self._grad_storages[param.dtype][dst_rank]._fill,
)
)
self._grad_storage_list = list(
chain(
*[
self._grad_storages[dtype].values()
for dtype in self._grad_storages.keys()
]
)
)
def _clear_task_flow(self):
"""Try to consume the previous tasks."""
while len(self._tasks_flow) > 0:
task = self._tasks_flow.popleft()
if task.callback is not None:
task.callback()
def _detect_train_change(self):
# Current trainable parameters
trainable_mask = list(map(_trainable, self._all_params))
# Whether parameters trainability changed
trainability_changed = trainable_mask != self._trainable_mask
if trainability_changed:
logging.warning(
"Trainable params changed, because of eval/train mode or parameter freezing/unfreeze."
)
self._trainable_mask = trainable_mask
return trainability_changed
def _build_grad_storages(self):
"""
Rebuild grad storages.
"""
# Rebuild fp16/fp32 grad storages
for dtype in self._grad_storages.keys():
for dst_rank, grad_storage in self._grad_storages[dtype].items():
if self._offload or dst_rank != self._rank:
grad_storage.manumal_relase()
grad_storage.rebuild()
def _rank_buffer_size(self, buffer_max_size, model_size):
"""
Generate the minimum buffer size for each rank & Display param sizes and model sizes.
"""
# Initialize buffer size
rank_buffer_size = {}
for shard_opt in self._sharding_optimizers:
if shard_opt.rank_buffer_size:
for dtype in shard_opt.rank_buffer_size.keys():
sizes = max(shard_opt.rank_buffer_size[dtype].values())
rank_buffer_size[dtype] = min(sizes, buffer_max_size)
if Type.fp16.value in rank_buffer_size.keys():
# FP16 GradStorage and model size
print(
"====== FP16 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".format(
rank_buffer_size[Type.fp16.value] / 2**19,
model_size / 2**19,
)
)
if Type.fp32.value in rank_buffer_size.keys():
# FP32 GradStorage and model size
print(
"====== FP32 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".format(
rank_buffer_size[Type.fp32.value] / 2**18,
model_size / 2**18,
)
)
return rank_buffer_size
def _redefine_opt_step(self):
grad_func = self._grad_scale
for opt in self._sharding_optimizers:
opt_step = opt.step
def _opt_step(self):
grad_func()
opt_step()
opt.step = MethodType(_opt_step, opt)
def _redefine_opt_clear(self):
clear_func = self._clear_gradients
def _opt_clear(self):
clear_func()
for opt in self._sharding_optimizers:
opt.clear_grad = MethodType(_opt_clear, opt)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import OrderedDict
from types import MethodType
import numpy as np
import paddle
import paddle.distributed as dist
import paddle.fluid.core as core
from paddle import nn
from paddle.autograd import PyLayer
from paddle.distributed import collective
from paddle.distributed.collective import _get_global_group
from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.fluid.framework import ParamBase
from ...utils.internal_storage import GradStorage
from ..pp_utils.utils import _all_gather
from .sharding_utils import ShardingClipGrad, Type, device_guard
# CUDA alignment 256 bytes
alignment = {
"gpu": 256,
}
align = {
Type.fp16.value: 2,
Type.fp32.value: 4,
}
global CHECK_LAYER
CHECK_LAYER = dict() # Help to check layer's id -> layer's name
class ShardingStage3(nn.Layer):
"""
A wrapper for Sharding Stage3 Layer in Dygraph.
.. warning: ShardingStage3 encapsulates the layer strategy and integrates it into the nn.Layer.
.. ZeRO: https://arxiv.org/pdf/1910.02054.pdf.
"""
# TODO (Baibaifan)
# Feature Notes::
# 1. The model supports the segmentation of parameters by global ranks in layers.
# 2. Support communication flow and computing flow.
# 3. Support offload function.
# 4. Support the establishment of independent communication groups.
def __init__(
self,
layer,
optimizer,
group=None,
sync_buffers=False,
device="gpu",
segment_size=2**15,
pertrain_sync_models=True,
offload=False,
sync_comm=False,
):
super().__init__()
# Default configs
assert core.is_compiled_with_cuda(), "Only support CUDA."
self._layer = layer
self._default_device = device
self.__sync_buffers = sync_buffers
self._offload = offload
self._sync_comm = sync_comm
# segmentation size
assert segment_size >= 0, "segment_size must be GE than 0."
self._segment_size = segment_size
global DEV
DEV = (
"cpu"
if paddle.get_device() == "cpu"
else paddle.get_device().split(":")[0]
)
global DEV_ID
DEV_ID = (
0
if paddle.get_device() == "cpu"
else int(paddle.get_device().split(":")[1])
)
global param2dtype
param2dtype = dict()
# Communication group establishment
self._group = (
collective.new_group(_get_global_group().ranks)
if group is None
else group
)
self._world_size_scaling = 1.0 / self._group.nranks
assert (
self._group.nranks > 1
), "Training must be distributed, ranks must be greater than 1."
self._rank = self._group.rank
self._global_root_rank = self._group.ranks[
0
] # picking rank 0 as the reference
self._global_ranks = self._group.ranks
# Parameter segmentation for global ranks
# After flatten -> self._param2buffer_size, self._param2buffer, self._trainable_params
self._param2buffer_size = dict() # {param.name: size}
self._param2buffer = (
dict()
) # {param.name: [(start0, end0),(start1, end1), ...]}
self._trainable_params = dict() # {id(layer): [trainable_params]}
self._unslice_params = set() # param's numel <= segment_size
self._unslice_params2align = dict() # {param.name: param's align}
self._grad_storages = dict() # {param.dtype: GradStorage}
assert not isinstance(
optimizer, list
), "Multiple optimizers are not supported now."
self._optim = _OptimizerWrapper(
optimizer, self._offload, self._group, self._update_params_slice
)
self._ori_parameter_list = self._optim._parameter_list
self._ori_param_groups = self._optim._param_groups
# Replace optimizer's _grad_clip
if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm):
logging.warning(
"While using ClipGradByGlobalNorm in ShardingStage3, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = ShardingClipGrad(
self._optim._grad_clip, paddle.get_device(), self._group
)
# Synchronous all ranks models
if pertrain_sync_models:
self._sync_params_and_buffers()
self._segment_rank_params(self._layer)
# Add unslice params to master_weight in fp16
self._handle_unslice_params()
# In the first step, record the execution order of the layer
self._order_tracer = OrderedDict()
self._order_tracer["order"] = 0
self._order_tracer["layer"] = list()
# Register task flow
self._task_flow = TaskFlow()
# Register forward hooks
self._register_forward_hooks(self._layer)
# Register backward parameter hooks
self._register_backward_hooks()
# Redefine optimizer step and clear function
self._redefine_opt_step()
self._redefine_opt_clear()
@paddle.autograd.no_grad()
def _sync_params_and_buffers(self):
"""
Sync all model states for all ranks
"""
for p in self._layer.parameters():
dist.broadcast(
p, src=self._global_root_rank, group=self._group, sync_op=True
)
# Multi stream operation will be supported later
dist.wait(tensor=p, group=self._group, use_calc_stream=True)
def _clear_gradients(self):
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
# 1.Handle param's slice
trainable_params = list(
filter(
lambda p: p.trainable and p not in self._unslice_params,
current_layer_params,
)
)
for param in trainable_params:
assert hasattr(
param, "fw_storage"
), "Find {} don't have fw_storage attribute.".format(param.name)
param.fw_storage.clear_gradient(False)
param.fw_storage._gradient_set_empty(False)
param.bw_storage._clear()
param.bw_storage = None
# 2.Handle unslice param
if not self._offload:
for grad_storage in self._grad_storages.values():
grad_storage.buffer.zero_()
else:
for param in list(self._unslice_params):
param.clear_gradient(False)
param._gradient_set_empty(False)
tmp_var = param.cuda(DEV_ID)
if (
tmp_var.dtype == Type.fp32.value
and param2dtype[param.name] == Type.fp16.value
):
tmp_var = paddle.cast(tmp_var, Type.fp16.value)
tmp_var._share_buffer_to(param)
tmp_var._clear()
for grad_storage in self._grad_storages.values():
grad_storage.manumal_relase()
grad_storage.rebuild()
# Update param memery slice
def _update_params_slice(self):
update_list = self._update_params()
if not isinstance(self._optim._param_groups[0], dict):
slice_params = [param.fw_storage for param in update_list]
self._optim._parameter_list = slice_params + list(
self._unslice_params
)
self._optim._param_groups = slice_params + list(
self._unslice_params
)
else:
for param_group in self._optim._param_groups:
p_group = []
for p in param_group['params']:
if hasattr(p, "fw_storage"):
p_group.append(p.fw_storage)
else:
p_group.append(p)
param_group['params'] = p_group
def forward(self, *inputs, **kwargs):
"""
A wrapper for Sharding Stage3 layer.
"""
# 1.Sync layer's buffers state
if self.__sync_buffers:
self._sync_buffers()
# 2.Normal FW on the base model
fw = self._layer(*inputs, **kwargs)
return fw
def set_state_dict(self, state_dict, use_structured_name=True):
self._layer.set_state_dict(
state_dict, use_structured_name=use_structured_name
)
def state_dict(
self,
destination=None,
include_sublayers=True,
structured_name_prefix="",
):
return self._layer.state_dict(
destination=None, include_sublayers=True, structured_name_prefix=""
)
def _handle_unslice_params(self):
buffer_size = dict()
buffer_size[Type.fp32.value] = 0
buffer_size[Type.fp16.value] = 0
for param in self._unslice_params:
# Updata optimizer master weights
if param.dtype == Type.fp16.value and not self._offload:
self._optim._master_weights[param.name] = paddle.cast(
param, Type.fp32.value
)
if self._offload:
param.master_weight = paddle.cast(param, Type.fp32.value).cpu()
param2dtype[param.name] = param.dtype
p_align = self._param2align(param)
self._unslice_params2align[param.name] = p_align
buffer_size[param.dtype] += param._numel() + p_align
# Create unslice_params'grad
for param in sorted(list(self._unslice_params), key=lambda p: p.name):
if param.dtype not in self._grad_storages.keys():
self._grad_storages[param.dtype] = GradStorage(
buffer_size[param.dtype],
dtype=param.dtype,
device=self._default_device,
destination=self._rank,
parm2align=self._unslice_params2align,
)
self._grad_storages[param.dtype].add_grad(
param, self._unslice_params2align[param.name]
)
def _segment_rank_params(self, layer, name="last_layer"):
"""
Flatten parameters according to layer.
"""
current_layer_params = _current_layer_params(layer)
if current_layer_params:
CHECK_LAYER[id(layer)] = name
self._flatten_layer_params(layer, current_layer_params)
for name, sub_layer in layer.named_children():
self._segment_rank_params(sub_layer, name)
def _flatten_layer_params(self, layer, current_layer_params):
"""
Parameter segmentation and memory integration.
"""
def _add_manage_info(trainable_param):
return _PartitionParam(trainable_param)
current_params = list()
for p in current_layer_params:
if p.trainable and p._numel() > self._segment_size:
current_params.append(_add_manage_info(p))
elif p.trainable:
self._unslice_params.add(_UnsliceParam(p))
assert id(layer) not in self._trainable_params.keys()
self._trainable_params[id(layer)] = current_params
for param in self._trainable_params[id(layer)]:
if param.name in self._param2buffer.keys():
continue
self._param2buffer[param.name] = []
# 1.Params alignment
align_ = self._param2align(param)
offset = align_ + param._numel()
buffer_size = (
offset
if offset % self._group.nranks == 0
else offset + self._group.nranks - (offset % self._group.nranks)
)
self._param2buffer_size[param.name] = buffer_size
# 2.Combination param buffer
assert buffer_size % self._group.nranks == 0
pre_buffer = buffer_size // self._group.nranks
for rank_ in range(self._group.nranks):
self._param2buffer[param.name].append(
(rank_ * pre_buffer, (rank_ + 1) * pre_buffer)
)
# Record param's dtype
param2dtype[param.name] = param.dtype
# 3.Flatten layer params and release other rank buffer
self._param_storage(param, buffer_size)
def _param_storage(self, param, buffer_size):
"""
This is a function to simplify the handling of parameter InternalStorages.
"""
assert isinstance(buffer_size, int)
value = (
np.zeros(buffer_size, dtype=np.float16)
if Type.fp16.value == param.dtype
else np.zeros(buffer_size, dtype=np.float32)
)
buffer = core.VarBase(value=value, place=core.CPUPlace())
param_shape = param.shape
origin_state = param.stop_gradient
param.stop_gradient = True
param.flatten_()
param.stop_gradient = origin_state
start, end = self._param2buffer[param.name][self._rank]
# Copy the current param value
tmp_var = core.VarBase(
tensor=buffer._slice(0, param._numel()), place=core.CPUPlace()
)
param_cpu = param.cpu()
tmp_var.value().get_tensor().set(
param_cpu.value().get_tensor(), core.CPUPlace()
)
param.value().get_tensor()._set_dims(param_shape)
# Current rank param_storage
if self._offload:
param.fw_storage = core.VarBase(
buffer._slice(start, end),
core.CPUPlace(),
"slice@" + param.name,
)
with device_guard(device="cpu"):
param.master_weight = paddle.cast(
param.fw_storage, Type.fp32.value
)
else:
param.fw_storage = core.VarBase(
buffer._slice(start, end), "slice@" + param.name
)
param.status = "part"
# Updata optimizer master weights
if param.dtype == Type.fp16.value and not self._offload:
self._optim._master_weights[param.fw_storage.name] = paddle.cast(
param.fw_storage, Type.fp32.value
)
param._clear()
def _register_forward_hooks(self, layer):
"""
Register pylayer to manage memory slices.
There are four stages:
FW
1. Before the forward layers, synchronize the full parameters.
2. After the forward layers, release the full parameter and keep the parameter slice.
BW
3. Before the backward layers, synchronize the full parameters and create param's grad.
4. After the gradient accumulation, release the full parameter and keep the parameter slice.
"""
current_layer_params = _current_layer_params(layer)
if current_layer_params:
self._register_forward_all_hooks(layer, self._task_flow)
for _, sub_layer in layer.named_children():
self._register_forward_hooks(sub_layer)
def _register_forward_all_hooks(self, sub_layer, task_flow):
def _forward_pre_hook(layer, inputs):
return ForwardPreHooks(
layer,
self._order_tracer,
self._trainable_params,
self._param2buffer,
self._rank,
self._group,
self._sync_comm,
self._offload,
task_flow,
)
def _forward_post_hook(layer, inputs, outputs):
return ForwardPostHooks.apply(
outputs,
layer,
self._order_tracer,
self._trainable_params,
self._param2buffer,
self._param2buffer_size,
self._rank,
self._group,
self._sync_comm,
self._offload,
task_flow,
)
# register previous forward hooks
sub_layer.register_forward_pre_hook(_forward_pre_hook)
# register post forward hooks
sub_layer.register_forward_post_hook(_forward_post_hook)
@paddle.autograd.no_grad()
def _sync_buffers(self):
"""
Sync all the param buffers from all ranks (exp: batch norm statistics).
"""
for buffer in self._layer.buffers(include_sublayers=True):
dist.broadcast(
buffer, self._global_root_rank, self._group, sync_op=True
)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)
def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
try:
return super().__getattr__(name)
except AttributeError:
return getattr(self._layer, name)
def _update_params(self):
"""
Update parameters to optimizer memory slice.
"""
update_list = []
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(
lambda p: p.trainable and p not in self._unslice_params,
current_layer_params,
)
)
# 1.Handle param's slice
for param in trainable_params:
assert hasattr(
param, "fw_storage"
), "Find {} don't have fw_storage attribute".format(param.name)
# Gradient average
if self._offload:
with device_guard(device="cpu"):
param.bw_storage.scale_(scale=self._world_size_scaling)
else:
param.bw_storage.scale_(scale=self._world_size_scaling)
param.fw_storage = _VarBaseWrapper(param)
assert param.fw_storage.grad is None
param.fw_storage._copy_gradient_from(param.bw_storage)
update_list.append(param)
# 2.Handle unslice param
for grad_storage in self._grad_storages.values():
grad_storage.buffer.scale_(scale=self._world_size_scaling)
dist.all_reduce(
tensor=grad_storage.buffer, group=self._group, sync_op=True
)
dist.wait(
tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True,
)
if self._offload:
for param in list(self._unslice_params):
param._clear()
param.master_weight._share_buffer_to(param)
for grad_storage in self._grad_storages.values():
for p in grad_storage._params:
tmp_g = _device2cpu(p.grad, convert_dtype=True)
p.clear_gradient(False)
p._gradient_set_empty(False)
p._copy_gradient_from(tmp_g)
tmp_g._clear()
grad_storage.buffer._clear()
return update_list
def get_all_parameters(self, convert2cpu=False):
"""
Get the full parameters and return the corresponding task flows.
"""
assert len(self._trainable_params.keys()) > 0
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(
lambda p: p.trainable and p not in self._unslice_params,
current_layer_params,
)
)
t_flow = _allgather_buffer(
trainable_params,
self._group,
use_calc_stream=True,
task_flow=TaskFlow(),
sync_wait=True,
offload=self._offload,
convert2cpu=convert2cpu,
)
if convert2cpu:
for param in trainable_params:
t_flow.full_param[param.name]._share_buffer_to(param)
self._optim._parameter_list = self._ori_parameter_list
self._optim._param_groups = self._ori_param_groups
def _register_backward_hooks(self):
current_layer_params = self._layer.parameters(include_sublayers=True)
trainable_params = list(
filter(
lambda p: p.trainable and p not in self._unslice_params,
current_layer_params,
)
)
for param in trainable_params:
allreduce_function = self._get_allreduce_fn(param)
param._register_backward_hook(allreduce_function)
def _get_allreduce_fn(self, param):
@paddle.autograd.no_grad()
def allreduce_(*_):
if param.name in self._task_flow.full_grad.keys():
full_grad = self._task_flow.full_grad[param.name]
# Only support sync allreduce current rank's layer now
dist.all_reduce(
tensor=full_grad, group=self._group, sync_op=True
)
dist.wait(
tensor=full_grad, group=self._group, use_calc_stream=True
)
start, end = self._param2buffer[param.name][self._rank]
if param.bw_storage is None:
param.bw_storage = (
core.VarBase(full_grad._slice(start, end))
.detach()
.clone()
)
if self._offload:
param.bw_storage = _device2cpu(param.bw_storage, True)
else:
if self._offload:
cpu_grad = _device2cpu(
core.VarBase(full_grad._slice(start, end))
.detach()
.clone(),
True,
)
with device_guard(device="cpu"):
param.bw_storage = paddle.add(
param.bw_storage, cpu_grad
)
else:
# param.bw_storage.add_(
# core.VarBase(full_grad._slice(start, end))
# .detach().clone())
param.bw_storage = paddle.add(
param.bw_storage,
core.VarBase(full_grad._slice(start, end))
.detach()
.clone(),
)
param.clear_gradient(False)
param._gradient_set_empty(False)
tmp_var = self._task_flow.full_grad.pop(param.name)
tmp_var._clear()
if param.name in self._task_flow.full_param.keys():
if param.status == "all":
param.use_count = 0
param._clear()
start, end = self._param2buffer[param.name][self._rank]
param.fw_storage = (
core.VarBase(
self._task_flow.full_param[param.name]._slice(
start, end
),
param.name + "@slice",
)
.detach()
.clone()
)
param.status = "part"
tmp_var = self._task_flow.full_param.pop(param.name)
tmp_var._clear()
if self._offload:
param.fw_storage._clear()
param.master_weight._share_buffer_to(param.fw_storage)
return allreduce_
def _param2align(self, param):
# CUDA alignment 256 bytes
size = param._numel() * align[param.dtype]
remaining = size % alignment[self._default_device]
ali = (
0 if remaining == 0 else alignment[self._default_device] - remaining
)
align_ = ali // align[param.dtype]
return align_
def _redefine_opt_step(self):
params_slice_func = self._update_params_slice
opt_step = self._optim.step
def _opt_step(self):
if not self.update_scaler:
params_slice_func()
if self.offload:
with device_guard(device="cpu"):
opt_step()
else:
opt_step()
def _opt_minimize(self):
raise RuntimeError(
"optimizer.minimize() not support now, please use optimizer.step()"
)
self._optim.step = MethodType(_opt_step, self._optim)
self._optim.minimize = MethodType(_opt_minimize, self._optim)
def _redefine_opt_clear(self):
clear_func = self._clear_gradients
def _opt_clear(self):
clear_func()
self._optim.clear_grad = MethodType(_opt_clear, self._optim)
def ForwardPreHooks(
layer,
order_tracer,
trainable_params,
param2buffer,
rank,
group,
sync_comm,
offload,
task_flow,
):
# Record layer's id
layer_id = id(layer)
use_calc, sync_wait = False, False
if layer_id not in order_tracer.keys() or sync_comm:
use_calc, sync_wait = True, True
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
else:
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
# wait current layer params
_wait_layer(
trainable_params[layer_id], task_flow, group, use_calc, offload
)
if layer_id == order_tracer["layer"][-1]:
return
order_ = order_tracer[layer_id]
layer_id = order_tracer["layer"][order_ + 1]
_allgather_buffer(
trainable_params[layer_id],
group,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload,
)
return
class ForwardPostHooks(PyLayer):
@staticmethod
def forward(
ctx,
inputs,
layer,
order_tracer,
trainable_params,
param2buffer,
param2buffer_size,
rank,
group,
sync_comm,
offload,
task_flow,
):
layer_id = id(layer)
# release current layer full params
_release_param(
trainable_params[layer_id], param2buffer, rank, task_flow, offload
)
if layer_id not in order_tracer.keys():
order_ = order_tracer["order"]
order_tracer[layer_id] = order_
order_tracer["order"] += 1
order_tracer["layer"].append(layer_id)
# Record bw info
ctx.order_tracer = order_tracer
ctx.task_flow = task_flow
ctx.group = group
ctx.layer = layer
ctx.sync_comm = sync_comm
ctx.trainable_params = trainable_params
ctx.param2buffer_size = param2buffer_size
ctx.offload = offload
return inputs
@staticmethod
def backward(ctx, *args):
# Load context value
order_tracer = ctx.order_tracer
task_flow = ctx.task_flow
group = ctx.group
layer = ctx.layer
trainable_params = ctx.trainable_params
param2buffer_size = ctx.param2buffer_size
sync_comm = ctx.sync_comm
offload = ctx.offload
layer_id = id(layer)
use_calc, sync_wait = False, False
# Allgather params synchronization
if sync_comm:
use_calc, sync_wait = True, True
_allgather_buffer(
trainable_params[layer_id],
group,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload,
)
else:
_wait_layer(
trainable_params[layer_id], task_flow, group, use_calc, offload
)
# Create params's grad
_create_params_grad(
trainable_params[layer_id], param2buffer_size, task_flow
)
# Whether to use calc stream
task_flow.use_calc[layer_id] = use_calc
if layer_id != order_tracer["layer"][0] and not sync_comm:
layer_next_id = order_tracer["layer"][order_tracer[layer_id] - 1]
_allgather_buffer(
trainable_params[layer_next_id],
group,
use_calc_stream=use_calc,
task_flow=task_flow,
sync_wait=sync_wait,
offload=offload,
)
return args
class TaskFlow:
"""
Task flows, one way linked list for task acquisition.
"""
def __init__(
self,
full_param=dict(),
full_grad=dict(),
use_calc=dict(),
callback=None,
):
self.full_param = full_param
self.full_grad = full_grad
self.use_calc = use_calc
self.callback = callback
def _release_param(
trainable_params, param2buffer, rank, task_flow, offload=False
):
for param in trainable_params:
# async communicate share weight not clear
param.use_count -= 1
if param.use_count == 0:
param._clear()
if param.name in task_flow.full_param.keys():
start, end = param2buffer[param.name][rank]
with paddle.amp.auto_cast(enable=False):
param.fw_storage = (
core.VarBase(
task_flow.full_param[param.name]._slice(start, end),
param.name + "@slice",
)
.detach()
.clone()
)
param.status = "part"
tmp_var = task_flow.full_param.pop(param.name)
tmp_var._clear()
if offload:
param.fw_storage = _device2cpu(param.fw_storage)
return
def _wait_layer(
trainable_params, task_flow, group, use_calc_stream, offload=False
):
paddle.device.cuda.synchronize()
for param in trainable_params:
if param.status == "all":
param.use_count += 1
continue
if param.name in task_flow.full_param.keys():
full_param = task_flow.full_param[param.name]
core.VarBase(full_param._slice(0, param._numel()))._share_buffer_to(
param
)
param.fw_storage._clear()
param.fw_storage = None
param.status = "all"
param.use_count += 1
else:
_allgather_buffer(
trainable_params,
group,
use_calc_stream=True,
task_flow=task_flow,
sync_wait=True,
offload=offload,
)
break
return task_flow
def _allgather_buffer(
trainable_params,
group,
use_calc_stream,
task_flow,
sync_wait=False,
offload=False,
convert2cpu=False,
):
for param in trainable_params:
if param.status == "all":
param.use_count += 1
continue
if offload:
param.fw_storage = _cpu2device(param)
with paddle.amp.auto_cast(enable=False):
full_param = _all_gather(
param.fw_storage, group, use_calc_stream=use_calc_stream
)
# Allgather current layer in the 1st step synchronously
if sync_wait:
with paddle.amp.auto_cast(enable=False):
dist.wait(
tensor=full_param,
group=group,
use_calc_stream=use_calc_stream,
)
core.VarBase(full_param._slice(0, param._numel()))._share_buffer_to(
param
)
param.fw_storage._clear()
param.fw_storage = None
param.status = "all"
param.use_count += 1
task_flow.full_param[param.name] = full_param
# parameter converts to cpu
if convert2cpu:
p_name = param.name
param = _device2cpu(param)
tmp_var = task_flow.full_param.pop(p_name)
tmp_var._clear()
task_flow.full_param[p_name] = param
return task_flow
@paddle.autograd.no_grad()
def _create_params_grad(trainable_params, param2buffer_size, task_flow):
for param in trainable_params:
if param.name in task_flow.full_grad.keys():
continue
assert isinstance(param2buffer_size[param.name], int)
temp_grad = paddle.zeros(
[param2buffer_size[param.name]], dtype=param.dtype
)
param._copy_gradient_from(
core.VarBase(temp_grad._slice(0, param._numel()))
)
task_flow.full_grad[param.name] = temp_grad
return task_flow
def _PartitionParam(param):
if not hasattr(param, "fw_storage"):
setattr(param, "fw_storage", None)
setattr(param, "bw_storage", None)
setattr(param, "master_weight", None)
setattr(param, "status", "all")
setattr(param, "use_count", 0)
return param
def _UnsliceParam(param):
if not hasattr(param, "unslice"):
setattr(param, "unslice", True)
setattr(param, "master_weight", None)
return param
def _VarBaseWrapper(param):
varbase = param.fw_storage
tmp_param = ParamBase(
shape=varbase.shape, dtype=varbase.dtype, name="slice@" + param.name
)
varbase._share_buffer_to(tmp_param)
tmp_param.regularizer = param.regularizer
tmp_param.optimize_attr['learning_rate'] = param.optimize_attr[
'learning_rate'
]
varbase._clear()
return tmp_param
def _OptimizerWrapper(optimizer, offload, group, update_params_slice):
if not hasattr(optimizer, "_optim"):
setattr(optimizer, "_optim", optimizer)
setattr(optimizer, "offload", offload)
setattr(optimizer, "group", group)
setattr(optimizer, "update_scaler", None)
setattr(optimizer, "update_slice", update_params_slice)
return optimizer
def _device2cpu(trans_param, convert_dtype=False):
if convert_dtype:
trans_param = paddle.cast(trans_param, Type.fp32.value)
tmp_p = trans_param.cpu()
trans_param._clear()
return tmp_p
def _cpu2device(param):
tmp_p = param.fw_storage.cuda(DEV_ID)
if (
tmp_p.dtype == Type.fp32.value
and param2dtype[param.name] == Type.fp16.value
):
tmp_p = paddle.cast(tmp_p, Type.fp16.value)
return tmp_p
def _current_layer_params(layer):
return (
layer.parameters(include_sublayers=False) + list(layer.extra_parameters)
if hasattr(layer, "extra_parameters")
else layer.parameters(include_sublayers=False)
)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from enum import Enum
from types import MethodType
import numpy as np
import paddle
from paddle import _legacy_C_ops
from paddle.fluid import core, layers
from paddle.fluid.dygraph import base as imperative_base
from paddle.fluid.dygraph import to_variable
from paddle.fluid.framework import dygraph_only
class Taskflow:
"""
Task flows, one way linked list for task acquisition.
"""
def __init__(self, task, callback):
self.task = task
self.callback = callback
class Type(Enum):
"""
Type of trainable parameters
"""
fp16 = paddle.float16
bf16 = paddle.bfloat16
fp32 = paddle.float32
class ShardingClipGrad:
def __init__(self, clip, device, group):
self._clip = clip
self._device = device
self._group = group
@imperative_base.no_grad
def _dygraph_clip(self, params_grads):
sum_square_fp32, sum_square_fp16 = [], []
unslice_params_fp32, unslice_params_fp16 = [], []
for p, g in params_grads:
p_slice = True # using for slice parameter in sharding stage3
if g is None or getattr(p, 'need_clip', True) is False:
continue
if hasattr(p, "unslice"):
p_slice = False
merge_grad = g
if g.type == core.VarDesc.VarType.SELECTED_ROWS:
merge_grad = layers.get_tensor_from_selected_rows(
layers.merge_selected_rows(g)
)
square = paddle.square(merge_grad)
sum_square = paddle.sum(square)
if p.dtype == paddle.float16:
if p_slice:
sum_square_fp16.append(sum_square)
else:
unslice_params_fp16.append(sum_square)
elif p.dtype == paddle.float32:
if p_slice:
sum_square_fp32.append(sum_square)
else:
unslice_params_fp32.append(sum_square)
# global norm of non-distributed FP16 params_and_grads
if len(sum_square_fp16) == 0:
global_norm_fp16 = paddle.to_tensor([0.0], dtype=paddle.float32)
else:
global_norm_fp16 = layers.concat(sum_square_fp16)
global_norm_fp16 = paddle.sum(global_norm_fp16)
global_norm_fp16 = paddle.cast(
global_norm_fp16, dtype=paddle.float32
)
# global norm of non-distributed FP16 params_and_grads for unslice parameter
if len(unslice_params_fp16) == 0:
global_unslice_fp16 = paddle.to_tensor([0.0], dtype=paddle.float32)
else:
global_unslice_fp16 = layers.concat(unslice_params_fp16)
global_unslice_fp16 = paddle.sum(global_unslice_fp16)
global_unslice_fp16 = paddle.cast(
global_unslice_fp16, dtype=paddle.float32
)
# global norm of non-distributed FP32 params_and_grads
global_norm_fp32 = (
layers.concat(sum_square_fp32)
if len(sum_square_fp32) != 0
else paddle.to_tensor([0.0], dtype=paddle.float32)
)
global_norm_fp32 = paddle.sum(global_norm_fp32)
# global norm of non-distributed FP32 params_and_grads for unslice parameter
global_unslice_fp32 = (
layers.concat(unslice_params_fp32)
if len(unslice_params_fp32) != 0
else paddle.to_tensor([0.0], dtype=paddle.float32)
)
global_unslice_fp32 = paddle.sum(global_unslice_fp32)
global_unslice_var = global_unslice_fp16 + global_unslice_fp32
global_norm_var = (
global_norm_fp16
+ global_norm_fp32
+ 1.0 / self._group.nranks * global_unslice_var
)
# add all reduce to get global norm of distributed params_and_grads
dev_id = int(self._device.split(":")[1])
with device_guard(dev_id, "gpu"):
paddle.distributed.all_reduce(global_norm_var, group=self._group)
global_norm_var = paddle.sqrt(global_norm_var)
max_global_norm = layers.fill_constant(
shape=[1], dtype=global_norm_var.dtype, value=self.clip_norm
)
clip_var = paddle.divide(
x=max_global_norm,
y=paddle.maximum(x=global_norm_var, y=max_global_norm),
)
clip_var_fp16 = paddle.cast(clip_var, paddle.float16)
for p, g in params_grads:
if getattr(p, 'need_clip', True) is False or g is None:
continue
origin_state = g.stop_gradient
g.stop_gradient = True
if p.dtype == paddle.float16:
g.scale_(clip_var_fp16)
else:
g.scale_(clip_var)
g.stop_gradient = origin_state
p._reset_grad_inplace_version(True)
return params_grads
def __getattr__(self, item):
return getattr(self._clip, item)
def __call__(self, params_grads):
return self._dygraph_clip(params_grads)
@contextlib.contextmanager
def device_guard(dev_id=0, device="cpu"):
origin_device = paddle.device.get_device()
if device == "cpu":
paddle.set_device(device)
elif device == "gpu":
paddle.set_device("gpu:{}".format(dev_id))
try:
yield
finally:
paddle.set_device(origin_device)
@dygraph_only
def ShardingScaler(scaler):
def unscale_method(self, optimizer):
if not self._enable:
return
param_grads = []
param_grads_fp16 = []
param_grads_fp32 = []
if hasattr(optimizer, "update_slice"):
optimizer.update_slice()
optimizer.update_scaler = True
if getattr(optimizer._optim, '_param_groups', None) and isinstance(
optimizer._optim._param_groups[0], dict
):
for group in optimizer._optim._param_groups:
for param in group['params']:
if param._grad_ivar() is not None:
param_grads.append(param._grad_ivar())
if param._grad_ivar().dtype in [
core.VarDesc.VarType.FP16,
paddle.float16,
]:
param_grads_fp16.append(param._grad_ivar())
else:
param_grads_fp32.append(param._grad_ivar())
else:
for param in optimizer._optim._parameter_list:
if param.grad is not None:
param_grads.append(param.grad)
if param.grad.dtype in [
core.VarDesc.VarType.FP16,
paddle.float16,
]:
param_grads_fp16.append(param.grad)
else:
param_grads_fp32.append(param.grad)
temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_))
temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_))
device = "cpu" if optimizer.offload else "gpu"
dev_id = (
0 if device == "cpu" else int(paddle.get_device().split(":")[1])
)
with device_guard(dev_id, device):
if len(param_grads_fp16):
_legacy_C_ops.check_finite_and_unscale(
param_grads_fp16,
self._scale,
param_grads_fp16,
temp_found_inf_fp16,
)
if len(param_grads_fp32):
_legacy_C_ops.check_finite_and_unscale(
param_grads_fp32,
self._scale,
param_grads_fp32,
temp_found_inf_fp32,
)
self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0
is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32")
paddle.distributed.all_reduce(
is_found_inf,
op=paddle.distributed.ReduceOp.MAX,
group=optimizer.group,
)
self._found_inf = is_found_inf.numpy()[0]
scaler._unscale = MethodType(unscale_method, scaler)
return scaler
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The file has been adapted from fairscale file:
# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py
# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e
# We retain the following license from the original files:
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import paddle
from paddle import framework
# (TODO: GhostScreaming) It will be removed later.
from paddle.fluid import core
from ..meta_parallel.sharding.sharding_utils import Type, device_guard
class InternalStorage:
"""
This is a basic class, which is responsible for consolidating the basic storage tensor.
"""
# Support integration parameter tensor
def __init__(self, size, dtype, device, convert_cpu=False):
self._params = []
self._param_ids = []
self._fill = 0
self._device = device
self._dtype = dtype
# The actual flat tensor
size = [size] if isinstance(size, int) else size
if convert_cpu:
value = (
np.zeros(size, dtype=np.float16)
if Type.fp16.value == dtype
else np.zeros(size, dtype=np.float32)
)
self.buffer = core.VarBase(value=value, place=core.CPUPlace())
else:
self.buffer = paddle.zeros(size, dtype=dtype)
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
assert (
self.buffer is not None
), "Cannot move a collapsed bucket, please rebuild it"
assert (
dtype == Type.fp32.value or Type.fp16.value
), "Conversion type is not supported now"
dev_id = (
0
if paddle.get_device() == "cpu"
else int(paddle.get_device().split(":")[1])
)
if self._device != device:
tmp_buffer = (
self.buffer.cuda(dev_id)
if device == "gpu"
else self.buffer.cpu()
)
for param in self._params:
param.clear_gradient(False)
param._gradient_set_empty(False)
self.buffer.value().get_tensor()._clear()
self.buffer = tmp_buffer
self._device = device
if dtype is not None:
self.buffer = self.buffer.cast(dtype=dtype)
self._dtype = dtype
class ParamStorage(InternalStorage):
"""
This is a basic class to simplify the handling of parameter InternalStorages.
"""
def __init__(self, size, dtype, device):
super().__init__(size, dtype, device, convert_cpu=True)
self.param2align = None
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
super().to(device, dtype)
if keep_alignment:
self._array_params()
@framework.no_grad()
def add_rank_params(self, trainable_params, param2align, convert_gpu=True):
"""
Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer.
"""
assert all(
[id(param) not in self._param_ids for param in trainable_params]
), "The same param cannot be checked in twice"
assert self.buffer is not None
self.param2align = param2align
cpu_param_shape = list()
for param in trainable_params:
p_shape = self._add_param_as_view(
param, param2align[param.name], convert_gpu
)
cpu_param_shape.append(p_shape)
if convert_gpu:
# buffer convert from cpu to cuda
dev_id = int(paddle.get_device().split(":")[1])
self.buffer = self.buffer.cuda(dev_id)
self._fill = 0
for idx, param in enumerate(trainable_params):
self._convert_buffer(
param, cpu_param_shape[idx], param2align[param.name]
)
self._params.append(param)
self._param_ids.append(id(param))
@framework.no_grad()
def _add_param_as_view(self, param, align, convert_gpu=True):
assert (
param.dtype == self.buffer.dtype
), "Different types for the InternalStorage and the param, cannot proceed: {} - {}".format(
param.dtype, self.buffer.dtype
)
var_end = self._fill + np.prod(param.shape)
offset = var_end + align
assert offset <= np.prod(self.buffer.shape)
p_shape = param.shape
origin_state = param.stop_gradient
param.stop_gradient = True
param.flatten_()
param.stop_gradient = origin_state
# Copy the current param value
dev_id = (
0
if paddle.get_device() == "cpu"
else int(paddle.get_device().split(":")[1])
)
with device_guard(dev_id, "cpu"):
tmp_var = core.VarBase(
tensor=self.buffer._slice(self._fill, var_end)
)
if convert_gpu:
param_cpu = param.cpu()
param.value().get_tensor()._clear()
tmp_var.set_value(param_cpu)
else:
tmp_var.set_value(param)
self._fill = offset
return p_shape
@framework.no_grad()
def _convert_buffer(self, param, p_shape, align):
var_end = self._fill + np.prod(p_shape)
offset = var_end + align
assert offset <= np.prod(self.buffer.shape)
# Convert the param value
tmp_tensor = self.buffer._slice(self._fill, var_end)
param.value().get_tensor()._share_data_with(tmp_tensor)
param.value().get_tensor()._set_dims(p_shape)
self._fill = offset
@framework.no_grad()
def _array_params(self):
"""
Given the parameters which have been registered previously, rebuild the whole InternalStorage.
"""
assert len(self._params) > 0
assert self.param2align is not None
self._fill = 0
for p in self._params:
self._convert_buffer(p, p.shape, self.param2align[p.name]) # modify
class GradStorage(InternalStorage):
"""
This is a basic class to simplify the handling of gradient InternalStorages
"""
def __init__(
self, size, dtype, device, destination, parm2align, convert_cpu=False
):
if isinstance(size, np.int64):
size = size.tolist()
super().__init__(size, dtype, device, convert_cpu)
self._max_size = size
self._release = False
self.params_checked_in = 0
self.destination = destination
self._parm2align = parm2align
self.sent = False
def reset_checked_in(self):
"""Reset the counter of the parameter grads which have been checked in"""
self.params_checked_in = 0
self.sent = False
@property
def all_checked_in(self):
"""Judge all the expected gradient check-in happened"""
return len(self._params) == self.params_checked_in
def can_add_grad_view(self, param, align):
"""Is there enough InternalStorage to add this parameter gradient, and whether this param have already checked in."""
return (
self._fill + np.prod(param.shape) + align <= self._max_size
and id(param) not in self._param_ids
)
def to(self, device, dtype=None, keep_alignment=True):
"""
Move the underlying buffer
"""
if self._release:
self.rebuild()
super().to(device, dtype)
if keep_alignment:
self._array_grads()
@framework.no_grad()
def add_grad(self, param, align):
"""
Add a new parameter gradient to the InternalStorage. Param.grad becomes a view of this InternalStorage buffer.
"""
assert (
id(param) not in self._param_ids
), "The same gradients cannot be checked in twice"
self._add_grad_as_view(param, align)
self._params.append(param)
self._param_ids.append(id(param))
@framework.no_grad()
def manumal_relase(self):
"""
Release the buffer from InternalStorage. The InternalStorage will need to be rebuilt before use.
"""
if not self._release:
for p in self._params:
if p.grad is not None:
p.clear_gradient(False)
p._gradient_set_empty(False)
self.buffer = None
self._fill = 0
self.params_checked_in = 0
self._release = True
@framework.no_grad()
def rebuild(self):
"""
Given the parameter gradients which have been registered previously, rebuild the whole InternalStorage.
"""
if self._release:
self.buffer = paddle.zeros([self._max_size], dtype=self._dtype)
for p in self._params:
self._add_grad_as_view(p, self._parm2align[p.name])
self._release = False
@framework.no_grad()
def _array_grads(self):
"""
Given the parameters gradients which have been registered previously, rebuild the whole InternalStorage.
"""
if len(self._params) > 0:
self._fill = 0
for p in self._params:
self._add_grad_as_view(p, self._parm2align[p.name])
@framework.no_grad()
def _add_grad_as_view(self, param, align):
assert (
np.prod(self.buffer.shape) > 0
), "Cannot add a gradient to a released InternalStorage, please rebuild"
assert param.dtype == self.buffer.dtype
grad_end = self._fill + np.prod(param.shape)
offset = grad_end + align
assert offset <= np.prod(self.buffer.shape)
# Copy the current grad value to InternalStorage
dev_id = (
0
if paddle.get_device() == "cpu"
else int(paddle.get_device().split(":")[1])
)
if self._device == "cpu":
with device_guard(dev_id, self._device):
tmp_var = core.VarBase(self.buffer._slice(self._fill, grad_end))
param._copy_gradient_from(tmp_var)
tmp_var.value().get_tensor()._clear()
elif self._device == "gpu":
tmp_var = core.VarBase(self.buffer._slice(self._fill, grad_end))
param._copy_gradient_from(tmp_var)
tmp_var.value().get_tensor()._clear()
self._fill = offset
......@@ -16,13 +16,6 @@ import logging
import os
import paddle
# Old version
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
# New version
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import (
GroupShardedOptimizerStage2,
)
......@@ -35,17 +28,7 @@ from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import (
GroupShardedScaler,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import (
ShardingStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import (
ShardingStage3,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import (
ShardingScaler,
)
from paddle.distributed.utils.log_utils import get_logger
from paddle.fluid.framework import in_dygraph_mode
from paddle.optimizer import Optimizer
logger_ = get_logger(logging.WARNING)
......@@ -148,7 +131,6 @@ def group_sharded_parallel(
logger_.info("*" * 30)
logger_.info("Sharded level os uses sharded level os_g achieved now.")
logger_.info("*" * 30)
if in_dygraph_mode():
optimizer = GroupShardedOptimizerStage2(
params=optimizer._parameter_list,
optim=optimizer,
......@@ -166,24 +148,7 @@ def group_sharded_parallel(
dp_group=dp_group,
device=device,
)
else:
optimizer = ShardingOptimizerStage2(
params=model.parameters(),
optim=optimizer,
group=group,
offload=offload,
device=device,
)
model = ShardingStage2(
model,
optimizer,
group=group,
sync_buffers=sync_buffers,
buffer_max_size=buffer_max_size,
device=device,
)
elif level == 'p_g_os':
if in_dygraph_mode():
model = GroupShardedStage3(
model,
optimizer=optimizer,
......@@ -195,24 +160,10 @@ def group_sharded_parallel(
dp_group=dp_group,
device=device,
)
else:
model = ShardingStage3(
model,
optimizer=optimizer,
group=group,
sync_buffers=sync_buffers,
segment_size=segment_size,
offload=offload,
sync_comm=sync_comm,
device=device,
)
else:
raise ValueError("Please enter the correct level.")
if isinstance(scaler, paddle.amp.GradScaler):
if in_dygraph_mode():
scaler = GroupShardedScaler(scaler)
else:
scaler = ShardingScaler(scaler)
logger_.info("*" * 30)
logger_.info(
"If there is a communication hang using group sharded, please check whether the communication operations of each process are unified."
......@@ -275,9 +226,9 @@ def save_group_sharded_model(model, output, optimizer=None):
), "Saving directory ({}) should be a directory, not a file".format(output)
os.makedirs(output, exist_ok=True)
output_model = os.path.join(output, "model.pdmodel")
if isinstance(model, (ShardingStage2, GroupShardedStage2)):
if isinstance(model, GroupShardedStage2):
paddle.save(model._layer.state_dict(), output_model)
elif isinstance(model, (ShardingStage3, GroupShardedStage3)):
elif isinstance(model, GroupShardedStage3):
convert2cpu = True if model._offload else False
model.get_all_parameters(convert2cpu=convert2cpu)
paddle.save(model._layer.state_dict(), output_model)
......
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from paddle.distributed.fleet.utils.internal_storage import GradStorage
from paddle.nn import Linear
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
epoch = 100
batch_size = 32
class_dim = 102
class MLP(fluid.Layer):
def __init__(self, param_attr=None, bias_attr=None):
super().__init__()
self._linear1 = Linear(10, 10)
self._linear2 = Linear(10, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
return y
def reader_decorator():
def __reader__():
for _ in range(100):
img = np.random.rand(10).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(parameter_list=None):
optimizer = paddle.optimizer.Momentum(
learning_rate=base_lr,
momentum=momentum_rate,
weight_decay=paddle.regularizer.L2Decay(l2_decay),
parameters=parameter_list,
)
return optimizer
def train_mlp():
fleet.init(is_collective=True)
group = paddle.distributed.new_group([0, 1])
mlp = MLP()
optimizer = optimizer_setting(parameter_list=mlp.parameters())
oss_optimizer = ShardingOptimizerStage2(
params=mlp.parameters(), optim=optimizer, group=group
)
# cover grad_storage code
trainable_param2align = dict()
for p in mlp.parameters():
trainable_param2align[p.name] = 0
grad_storage = GradStorage(
10000,
dtype=paddle.float32,
device="gpu",
destination=0,
parm2align=trainable_param2align,
)
for p in mlp.parameters():
grad_storage.can_add_grad_view(p, trainable_param2align[p.name])
grad_storage.add_grad(p, trainable_param2align[p.name])
grad_storage.manumal_relase()
grad_storage.rebuild()
grad_storage.reset_checked_in()
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
mlp.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
out = mlp(img)
loss = paddle.nn.functional.cross_entropy(input=out, label=label)
avg_loss = paddle.mean(x=loss)
acc_top1 = paddle.metric.accuracy(input=out, label=label, k=1)
acc_top5 = paddle.metric.accuracy(input=out, label=label, k=5)
dy_out = avg_loss.numpy()
avg_loss.backward()
oss_optimizer.step()
# oss_optimizer clear cache
oss_optimizer._clear_cache()
# check optimizer.minimize() error
try:
oss_optimizer.minimize()
except:
print(
"====== Find sharding_stage2_optimizer.minimize() error ======"
)
return
if __name__ == '__main__':
train_mlp()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import (
ShardingStage2,
)
from paddle.nn import Linear
seed = 2022
epoch = 2
linear_size = 1000
strategy = fleet.DistributedStrategy()
strategy.hybrid_configs = {
"dp_degree": 2,
"mp_degree": 1,
"pp_degree": 1,
"sharding_degree": 1,
}
np.random.seed(seed)
paddle.seed(seed)
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super().__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(
parameters=[{"params": model.parameters()}]
if opt_group
else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16,
)
return optimizer
def train_mlp(
model,
sharding_stage,
batch_size=100,
use_pure_fp16=False,
accumulate_grad=False,
opt_group=False,
save_model=False,
):
if sharding_stage == "dp":
hcg = fleet.get_hybrid_communicate_group()
group = hcg.get_check_parallel_group()
else:
group = paddle.distributed.new_group([0, 1])
if opt_group:
optimizer = optimizer_setting(
model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group
)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if sharding_stage == 2:
optimizer = ShardingOptimizerStage2(
params=model.parameters(), optim=optimizer, group=group
)
model = ShardingStage2(
model, optimizer, group=group, buffer_max_size=2**21
)
else:
optimizer = fleet.distributed_optimizer(optimizer)
model = fleet.distributed_model(model)
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
if sharding_stage == 2:
model.to(device="gpu")
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
out = model(img)
loss = paddle.nn.functional.cross_entropy(input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
avg_loss.backward()
if not accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if save_model:
return model, optimizer
return model.parameters()
def test_dp_stage2():
mlp = MLP()
state_dict = mlp.state_dict()
mlp1 = MLP()
mlp2 = MLP()
mlp3 = MLP()
mlp4 = MLP()
mlp5 = MLP()
mlp6 = MLP()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
# DP VS stage2
dp_params = train_mlp(
mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False
)
stage2_params = train_mlp(
mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False
)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6
)
# stage2 accumulate grad
stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True)
stage2_accumulate_grad = train_mlp(
mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True
)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage2_accumulate_grad[i].numpy(),
rtol=1e-5,
atol=1e-5,
)
# stage2 param list VS param group
stage2_params = train_mlp(
mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True
)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6
)
# save/load model
output_dir = tempfile.mkdtemp()
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage2, optimizer_stage2 = train_mlp(
mlp6,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False,
save_model=True,
)
paddle.save(model_stage2.state_dict(), model_file)
paddle.save(optimizer_stage2.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage2.set_state_dict(m_state_dict)
optimizer_stage2.set_state_dict(opt_state_dict)
shutil.rmtree(output_dir)
return
if __name__ == '__main__':
fleet.init(is_collective=True, strategy=strategy)
test_dp_stage2()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from dygraph_sharding_stage2 import MLP, optimizer_setting, reader_decorator
import paddle
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import (
ShardingStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import (
ShardingScaler,
)
seed = 2021
epoch = 2
batch_size = 32
linear_size = 1000
strategy = fleet.DistributedStrategy()
strategy.hybrid_configs = {
"dp_degree": 2,
"mp_degree": 1,
"pp_degree": 1,
"sharding_degree": 1,
}
np.random.seed(seed)
paddle.seed(seed)
def train_mlp(model, offload=False):
optimizer = optimizer_setting(model=model, use_pure_fp16=True)
model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32')
scaler = paddle.amp.GradScaler(init_loss_scaling=1024)
scaler = ShardingScaler(scaler)
optimizer = ShardingOptimizerStage2(
params=model.parameters(), optim=optimizer, offload=offload
)
model = ShardingStage2(model, optimizer, buffer_max_size=2**21)
train_reader = paddle.batch(
reader_decorator(linear_size), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label
)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
scaler.scale(avg_loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
for dtype in optimizer.param_storages:
for dst_rank, param_storage in optimizer.param_storages[dtype].items():
param_storage.to(device="gpu", dtype=dtype)
return model.parameters()
def test_sharding_stage2_offload():
mlp = MLP(linear_size)
mlp_offload = MLP(linear_size)
mlp_offload.set_state_dict(mlp.state_dict())
mlp_params = train_mlp(mlp, offload=False)
mlp_offload_params = train_mlp(mlp_offload, offload=True)
for i in range(len(mlp_params)):
np.testing.assert_allclose(
mlp_params[i].numpy(),
mlp_offload_params[i].numpy(),
rtol=5e-3,
atol=5e-3,
)
return
if __name__ == '__main__':
fleet.init(is_collective=True, strategy=strategy)
test_sharding_stage2_offload()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import (
ShardingStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import (
ShardingStage3,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import (
ShardingScaler,
)
from paddle.nn import Linear
epoch = 10
paddle.seed(2021)
np.random.seed(2021)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super().__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.Momentum(
parameters=[{"params": list(model.parameters())}]
if opt_group
else list(model.parameters()),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16,
)
return optimizer
def train_mlp(
model,
sharding_stage,
use_pure_fp16=False,
accumulate_grad=False,
batch_size=100,
opt_group=False,
sync_comm=False,
test_minimize=False,
save_model=False,
):
group = paddle.distributed.new_group([0, 1])
if opt_group:
optimizer = optimizer_setting(
model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group
)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if use_pure_fp16:
model = paddle.amp.decorate(
models=model, level='O2', save_dtype='float32'
)
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = ShardingScaler(scaler)
if sharding_stage == 2:
optimizer = ShardingOptimizerStage2(
params=model.parameters(), optim=optimizer, group=group
)
model = ShardingStage2(
model, optimizer, group=group, buffer_max_size=2**21
)
elif sharding_stage == 3:
model = ShardingStage3(
model, optimizer=optimizer, group=group, sync_comm=sync_comm
)
# check optimizer.minimize() error
if test_minimize:
try:
optimizer.minimize()
except:
print(
"====== Find sharding_stage3_optimizer.minimize() error ======"
)
return
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label
)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
if not use_pure_fp16:
avg_loss.backward()
else:
scaler.scale(avg_loss).backward()
if not accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if sharding_stage == 3:
model.get_all_parameters()
if save_model:
return model, optimizer
return model.parameters()
def test_stage2_stage3():
mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = (
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
)
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
mlp7.set_state_dict(state_dict)
mlp8.set_state_dict(state_dict)
mlp9.set_state_dict(state_dict)
mlp10.set_state_dict(state_dict)
# fp32
stage2_params = train_mlp(
mlp1, sharding_stage=2, use_pure_fp16=False, opt_group=False
)
stage3_params = train_mlp(
mlp2, sharding_stage=3, use_pure_fp16=False, opt_group=False
)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-6,
atol=1e-6,
)
# fp32 accumulate grad
stage3_params = train_mlp(
mlp3,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
opt_group=True,
)
stage3_params_add = train_mlp(
mlp4,
sharding_stage=3,
use_pure_fp16=False,
accumulate_grad=True,
batch_size=20,
opt_group=True,
)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_add[i].numpy(),
rtol=1e-6,
atol=1e-4,
)
# fp16
stage2_params = train_mlp(
mlp5, sharding_stage=2, use_pure_fp16=True, opt_group=False
)
stage3_params = train_mlp(
mlp6, sharding_stage=3, use_pure_fp16=True, opt_group=False
)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage3_params[i].numpy(),
rtol=1e-4,
atol=1e-3,
)
# fp16 sync_comm
stage3_params = train_mlp(
mlp7, sharding_stage=3, use_pure_fp16=True, opt_group=False
)
stage3_params_re = train_mlp(
mlp8,
sharding_stage=3,
use_pure_fp16=True,
opt_group=False,
sync_comm=True,
)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(), stage3_params_re[i].numpy(), rtol=1e-6
)
# save/load model
output_dir = tempfile.mkdtemp()
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage3, optimizer_stage3 = train_mlp(
mlp9,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
save_model=True,
)
paddle.save(model_stage3.state_dict(), model_file)
paddle.save(optimizer_stage3.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage3.set_state_dict(m_state_dict)
optimizer_stage3.set_state_dict(opt_state_dict)
shutil.rmtree(output_dir)
# check optimizer.minimize() error
train_mlp(
mlp10,
sharding_stage=3,
use_pure_fp16=False,
opt_group=False,
test_minimize=True,
)
if __name__ == '__main__':
fleet.init(is_collective=True)
test_stage2_stage3()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import (
ShardingStage3,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import (
ShardingScaler,
)
from paddle.nn import Linear
epoch = 10
paddle.seed(2022)
np.random.seed(2022)
base_lr = 0.1
momentum_rate = 0.9
l2_decay = 1e-4
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super().__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(
parameters=[{"params": model.parameters()}]
if opt_group
else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16,
)
return optimizer
def train_mlp(
model,
use_pure_fp16=False,
accumulate_grad=False,
offload=False,
batch_size=100,
convert2cpu=False,
):
group = paddle.distributed.new_group([0, 1])
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if use_pure_fp16:
model = paddle.amp.decorate(
models=model, level='O2', save_dtype='float32'
)
scaler = paddle.amp.GradScaler(init_loss_scaling=32768)
scaler = ShardingScaler(scaler)
model = ShardingStage3(
model, optimizer=optimizer, group=group, offload=offload
)
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
with paddle.amp.auto_cast(True, level='O2'):
out = model(img)
loss = paddle.nn.functional.cross_entropy(
input=out, label=label
)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if accumulate_grad:
avg_loss = avg_loss / 5
if not use_pure_fp16:
avg_loss.backward()
else:
scaler.scale(avg_loss).backward()
if not accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if accumulate_grad:
if not use_pure_fp16:
optimizer.step()
else:
scaler.step(optimizer)
scaler.update()
optimizer.clear_grad()
if not convert2cpu:
model.get_all_parameters()
else:
model.get_all_parameters(convert2cpu)
return model.parameters()
def test_stage3_offload():
mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6 = (
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
MLP(),
)
state_dict = mlp.state_dict()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
# fp32 offload
stage3_params = train_mlp(mlp1, use_pure_fp16=False)
stage3_params_offload = train_mlp(mlp2, use_pure_fp16=False, offload=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-6,
atol=1e-8,
)
# fp16 offload
stage3_params = train_mlp(mlp3, use_pure_fp16=True)
stage3_params_offload = train_mlp(mlp4, use_pure_fp16=True, offload=True)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-2,
atol=1e-2,
)
# fp32 accumulate grad offload
stage3_params = train_mlp(
mlp5, use_pure_fp16=False, batch_size=20, accumulate_grad=True
)
stage3_params_offload = train_mlp(
mlp6,
use_pure_fp16=False,
accumulate_grad=True,
offload=True,
batch_size=20,
convert2cpu=True,
)
for i in range(len(stage3_params)):
np.testing.assert_allclose(
stage3_params[i].numpy(),
stage3_params_offload[i].numpy(),
rtol=1e-6,
atol=1e-8,
)
return
if __name__ == '__main__':
fleet.init(is_collective=True)
test_stage3_offload()
# -*- coding: UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import (
ShardingOptimizerStage2,
)
from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import (
ShardingStage2,
)
from paddle.nn import Linear
seed = 2022
epoch = 2
linear_size = 1000
strategy = fleet.DistributedStrategy()
strategy.hybrid_configs = {
"dp_degree": 16,
"mp_degree": 1,
"pp_degree": 1,
"sharding_degree": 1,
}
np.random.seed(seed)
paddle.seed(seed)
class MLP(fluid.Layer):
def __init__(self, linear_size=1000, param_attr=None, bias_attr=None):
super().__init__()
self._linear1 = Linear(linear_size, linear_size)
self._linear2 = Linear(linear_size, linear_size)
self._linear3 = Linear(linear_size, linear_size)
self._linear4 = Linear(linear_size, linear_size)
self._linear5 = Linear(linear_size, 10)
def forward(self, inputs):
y = self._linear1(inputs)
y = self._linear2(y)
y = self._linear3(y)
y = self._linear4(y)
y = self._linear5(y)
return y
def reader_decorator(linear_size=1000):
def __reader__():
for _ in range(100):
img = np.random.rand(linear_size).astype('float32')
label = np.ones(1).astype('int64')
yield img, label
return __reader__
def optimizer_setting(model, use_pure_fp16, opt_group=False):
clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
optimizer = paddle.optimizer.AdamW(
parameters=[{"params": model.parameters()}]
if opt_group
else model.parameters(),
learning_rate=0.001,
weight_decay=0.00001,
grad_clip=clip,
multi_precision=use_pure_fp16,
)
return optimizer
def train_mlp(
model,
sharding_stage,
batch_size=100,
use_pure_fp16=False,
accumulate_grad=False,
opt_group=False,
save_model=False,
):
if sharding_stage == "dp":
hcg = fleet.get_hybrid_communicate_group()
group = hcg.get_check_parallel_group()
else:
group = paddle.distributed.new_group(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
)
if opt_group:
optimizer = optimizer_setting(
model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group
)
else:
optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16)
if sharding_stage == 2:
optimizer = ShardingOptimizerStage2(
params=model.parameters(), optim=optimizer, group=group
)
model = ShardingStage2(
model, optimizer, group=group, buffer_max_size=2**21
)
else:
optimizer = fleet.distributed_optimizer(optimizer)
model = fleet.distributed_model(model)
train_reader = paddle.batch(
reader_decorator(), batch_size=batch_size, drop_last=True
)
train_loader = paddle.io.DataLoader.from_generator(
capacity=32,
use_double_buffer=True,
iterable=True,
return_list=True,
use_multiprocess=True,
)
train_loader.set_sample_list_generator(train_reader)
if sharding_stage == 2:
model.to(device="gpu")
for eop in range(epoch):
model.train()
for batch_id, data in enumerate(train_loader()):
img, label = data
label.stop_gradient = True
img.stop_gradient = True
out = model(img)
loss = paddle.nn.functional.cross_entropy(input=out, label=label)
avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32))
if batch_size == 20:
avg_loss = avg_loss / 5
avg_loss.backward()
if not accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if accumulate_grad:
optimizer.step()
optimizer.clear_grad()
if save_model:
return model, optimizer
return model.parameters()
def test_dp_stage2():
mlp = MLP()
state_dict = mlp.state_dict()
mlp1 = MLP()
mlp2 = MLP()
mlp3 = MLP()
mlp4 = MLP()
mlp5 = MLP()
mlp6 = MLP()
mlp1.set_state_dict(state_dict)
mlp2.set_state_dict(state_dict)
mlp3.set_state_dict(state_dict)
mlp4.set_state_dict(state_dict)
mlp5.set_state_dict(state_dict)
mlp6.set_state_dict(state_dict)
# DP VS stage2
dp_params = train_mlp(
mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False
)
stage2_params = train_mlp(
mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False
)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6, atol=5e-4
)
# stage2 accumulate grad
stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True)
stage2_accumulate_grad = train_mlp(
mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True
)
for i in range(len(stage2_params)):
np.testing.assert_allclose(
stage2_params[i].numpy(),
stage2_accumulate_grad[i].numpy(),
rtol=1e-5,
atol=1e-5,
)
# stage2 param list VS param group
stage2_params = train_mlp(
mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True
)
for i in range(len(dp_params)):
np.testing.assert_allclose(
dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6, atol=5e-4
)
# save/load model
output_dir = tempfile.mkdtemp()
try:
model_file = os.path.join(output_dir, "model.pdmodel")
optimizer_file = os.path.join(output_dir, "model.pdopt")
model_stage2, optimizer_stage2 = train_mlp(
mlp6,
sharding_stage=2,
use_pure_fp16=False,
opt_group=False,
save_model=True,
)
paddle.save(model_stage2.state_dict(), model_file)
paddle.save(optimizer_stage2.state_dict(), optimizer_file)
m_state_dict = paddle.load(model_file)
opt_state_dict = paddle.load(optimizer_file)
model_stage2.set_state_dict(m_state_dict)
optimizer_stage2.set_state_dict(opt_state_dict)
except Exception as e:
shutil.rmtree(output_dir)
raise e
else:
shutil.rmtree(output_dir)
if __name__ == '__main__':
fleet.init(is_collective=True, strategy=strategy)
test_dp_stage2()
......@@ -25,13 +25,6 @@ class TestDYgrapShardingDP(TestDistBase):
self._trainers = 16
self._init_env()
def test_hybrid_sharding_stage2(self):
self.check_with_place(
"mn_dygraph_sharding_stage2.py",
backend="nccl",
need_envs=os.environ,
)
def test_hybrid_sharding_stage3(self):
self.check_with_place(
"mn_dygraph_group_sharded_stage3.py",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册