From 6ad0f061c26dc5e9157abf3e45ac6003daefbaa5 Mon Sep 17 00:00:00 2001 From: Baibaifan <39549453+Baibaifan@users.noreply.github.com> Date: Fri, 22 Apr 2022 13:03:13 +0800 Subject: [PATCH] [Cherry-pick] sharding for eager tensor (#42054) * sharding_for_eager_tensor (#41415) * fix_sharding_copy_right (#41849) --- paddle/fluid/pybind/eager_method.cc | 2 +- .../sharding_optimizer_stage2.py | 13 +- .../group_sharded_optimizer_stage2.py | 417 ++++++++ .../sharding/group_sharded_stage2.py | 543 +++++++++++ .../sharding/group_sharded_stage3.py | 912 ++++++++++++++++++ .../sharding/group_sharded_storage.py | 320 ++++++ .../sharding/group_sharded_utils.py | 227 +++++ .../meta_parallel/sharding/sharding_stage2.py | 13 +- .../fleet/utils/internal_storage.py | 13 +- .../distributed/sharding/group_sharded.py | 79 +- .../fluid/dygraph/varbase_patch_methods.py | 5 + .../fluid/tests/unittests/CMakeLists.txt | 2 +- .../unittests/dygraph_group_sharded_api.py | 3 + .../dygraph_group_sharded_api_eager.py | 147 +++ .../unittests/dygraph_group_sharded_stage2.py | 229 +++++ .../dygraph_group_sharded_stage2_offload.py | 112 +++ .../unittests/dygraph_group_sharded_stage3.py | 283 ++++++ .../dygraph_group_sharded_stage3_offload.py | 205 ++++ .../dygraph_sharding_optimizer_stage2.py | 3 + .../unittests/dygraph_sharding_stage2.py | 3 + .../dygraph_sharding_stage2_offload.py | 3 + .../unittests/dygraph_sharding_stage3.py | 3 + .../dygraph_sharding_stage3_offload.py | 3 + .../test_dygraph_group_sharded_api.py | 1 + .../unittests/test_dygraph_sharding_stage2.py | 2 + .../unittests/test_dygraph_sharding_stage3.py | 2 + .../tests/unittests/test_egr_python_api.py | 2 +- 27 files changed, 3513 insertions(+), 34 deletions(-) create mode 100644 python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py create mode 100644 python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py create mode 100644 python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py create mode 100644 python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_storage.py create mode 100644 python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_utils.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_group_sharded_api_eager.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2_offload.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3_offload.py diff --git a/paddle/fluid/pybind/eager_method.cc b/paddle/fluid/pybind/eager_method.cc index 5295807349c..7f1673bafd9 100644 --- a/paddle/fluid/pybind/eager_method.cc +++ b/paddle/fluid/pybind/eager_method.cc @@ -550,7 +550,7 @@ static PyObject* tensor__share_buffer_to(TensorObject* self, PyObject* args, } auto dst_tensor = static_cast(dst_ptr->impl().get()); - dst_tensor->ShareDataWith(*src_tensor); + dst_tensor->ShareBufferWith(*src_tensor); dst_tensor->ShareDataTypeWith(*src_tensor); Py_INCREF(Py_None); return Py_None; diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py index a2c741667ed..fb43b89e1a6 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py @@ -11,9 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -#Taken and modified for fairscale from: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py -#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. import copy import logging diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py new file mode 100644 index 00000000000..70d2d2a1930 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py @@ -0,0 +1,417 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +import copy +import logging +import numpy as np +from collections import OrderedDict + +import paddle +import paddle.fluid as fluid +from paddle.fluid import core +from paddle.optimizer import Optimizer +from paddle.fluid.clip import ClipGradByGlobalNorm +from paddle.distributed.collective import _get_global_group, new_group, broadcast, wait + +from .group_sharded_storage import ParamStorage, GradStorage +from .group_sharded_utils import Type, device_guard, GroupShardedClipGrad + +# CUDA alignment 256 bytes, cpu alignment 4096 bytes +alignment = {"gpu": 256, "cpu": 4096} +align = { + Type.fp16.value: 2, + Type.fp32.value: 4, +} + + +class GroupShardedOptimizerStage2(Optimizer): + """ + A wrapper for Sharding Stage2 Optimizer in Dygraph. + + .. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer. + + .. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf. + + """ + + # TODO (Baibaifan) + # Feature Notes: + # 1. Unified memory for parameters and parameters.grad to InternalStorage. + # 2. Support the segmentation of optimizer parameters and partial updating of parameters. + # 3. Dynamically adjust training parameters and models. + # 4. Support offload function. + # 5. Support the establishment of independent communication groups. + # 6. Broadcast_fp16 is not supported now. + def __init__(self, + params, + optim, + group=None, + offload=False, + device="gpu", + pertrain_sync_models=True, + **kw): + + super().__init__(learning_rate=optim._learning_rate, parameters=params) + assert core.is_compiled_with_cuda(), "Only GPU is supported now" + + # Segmentation information + self._dtype_rank_params = OrderedDict( + ) # {dtype:[param1,param2]} device, rank, params + self._param2rank = {} + self.__segment_params = [] + self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}} + self._param2align = {} # {param.name: align} + + # Default information + self._optim = optim + + assert hasattr(self._optim, "_master_weights" + ), "Must use optimizer with _master_weights attribute" + + # Support parameter group and parameter list + self._local_params = [] + if isinstance(params[0], dict): + for param_group in params: + self._local_params.extend(list(param_group["params"])) + else: + self._local_params.extend(list(params)) + + self._default_device = device + self._pfp16 = len( + list( + filter(lambda x: x.trainable and x.dtype == Type.fp16.value, + self._local_params))) > 0 + + self._group = new_group(_get_global_group() + .ranks) if group is None else group + + self.world_size = self._group.nranks + self._rank = self._group.rank + self._global_root_rank = self._group.ranks[0] + + # Synchronous all ranks models + if pertrain_sync_models: + self._sync_params_and_buffers() + + self.param_storages = {} # {dtype: {rank: InternalStorage}} + + if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm): + logging.warning( + "While using ClipGradByGlobalNorm in GroupShardedOptimizerStage2, the grad clip of original optimizer will be changed." + ) + + self._optim._grad_clip = GroupShardedClipGrad( + self._optim._grad_clip, paddle.get_device(), self._group) + if self._optim._parameter_list and isinstance( + self._optim._parameter_list[0], dict): + for item in self._optim._param_groups: + if "grad_clip" in item.keys(): + item["grad_clip"] = self._optim._grad_clip + + if offload: + assert self._pfp16, "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16" + + self.offload = offload # Using for offload + self.offload_device = "cpu" + self.offload_buffer_size = 0 + self.offload_param2align = {} + self.offload_params = None + self.offload_grads = None + self.dev_id = int(paddle.get_device().split(":")[1]) + + self._master_params = {} + + # Update optimizer parameters and adjust parameter storage and use according to rank. + self._update_opt_status() + + @paddle.autograd.no_grad() + def _sync_params_and_buffers(self): + """ + Sync all model states for all ranks + """ + + for p in self._local_params: + broadcast( + p, + src=self._global_root_rank, + group=self._group, + use_calc_stream=True) + + def _generate_master_params(self, trainable_params): + if self.offload: + for param in trainable_params: + if param.name not in self._master_params.keys(): + self._master_params[param.name] = core.eager.Tensor( + name=param.name, + value=param.cast(dtype=Type.fp32.value).numpy(), + place=core.CPUPlace(), + stop_gradient=param.stop_gradient) + else: + for param in trainable_params: + if param.dtype == Type.fp16.value: + master_tensor = paddle.cast(param, Type.fp32.value) + master_tensor.name = param.name + self._optim._master_weights[param.name] = master_tensor + + def _update_opt_status(self): + """Update optimizer status and parameter storage information, and special functions to be developed. + """ + # func 1 + self._integration_params() + + # Segement helpers + + def _segment_params(self): + """ + Divide all optimizer parameters equally into rank. + """ + if len(self.__segment_params) == 0: + self.__segment_params, param_lists = [ + [] for _ in range(self.world_size) + ], [[] for _ in range(self.world_size)] + sizes = [0] * self.world_size + for param in self._local_params: + # Add this param to rank with smallest size. + rank = sizes.index(min(sizes)) + param_lists[rank].append(param) + + # Statistical real numels + sizes[rank] += param._numel() if param.trainable else 0 + + for rank, params in enumerate(param_lists): + self.__segment_params[rank].extend(params) + return self.__segment_params + + @property + def local_params(self): + return self._local_params + + @property + def param2rank(self): + """Map the params to the rank which owns them""" + if len(self._param2rank) == 0: + for rank, params in enumerate(self._segment_params()): + for param in params: + self._param2rank[param.name] = rank + return self._param2rank + + @property + def dtype_rank_params(self): + """ + Divide the parameters into groups according to rank and dtype. + """ + if len(self._dtype_rank_params) == 0: + # Assign the parameters of each rank according to the type + for param in self._local_params: + if param.dtype not in self._dtype_rank_params.keys(): + self._dtype_rank_params[ + param.dtype] = [[] for _ in range(self.world_size)] + self._dtype_rank_params[param.dtype][self.param2rank[ + param.name]].append(param) + + # Sort per rank params by size + for dtype in self._dtype_rank_params.keys(): + for rank_params in self._dtype_rank_params[dtype]: + rank_params.sort(key=lambda x: x._numel()) + + return self._dtype_rank_params + + @property + def rank_buffer_size(self): + """ + Count the memory size of the parameters corresponding to rank under the corresponding dtype. + """ + # CUDA alignment 256 bytes + if len(self._rank_buffer_size) == 0: + for dtype in self.dtype_rank_params.keys(): + if dtype not in self._rank_buffer_size.keys(): + self._rank_buffer_size[dtype] = {} + for dst_rank, per_rank_params in enumerate( + self.dtype_rank_params[dtype]): + if dst_rank not in self._rank_buffer_size[dtype].keys(): + self._rank_buffer_size[dtype][dst_rank] = 0 + for param in per_rank_params: + if not param.trainable: + continue + size = param._numel() * align[dtype] + remaining = size % alignment[self._default_device] + ali = 0 if remaining == 0 else alignment[ + self._default_device] - remaining + align_ = ali // align[dtype] + self._rank_buffer_size[dtype][dst_rank] += param._numel( + ) + align_ + self._param2align[param.name] = align_ + + return self._rank_buffer_size + + def _integration_params(self): + """ + Integrate the parameters into a continuous memory according to rank, and support the update of training parameters. + """ + + for dtype, per_rank_params in self.dtype_rank_params.items(): + if dtype not in self.param_storages.keys(): + self.param_storages[dtype] = {} + + for dst_rank, params in enumerate(per_rank_params): + if len(params) > 0: + + # Merge all the trainable params in a single InternalStorage + trainable_params = list( + filter(lambda x: x.trainable, params)) + if self._pfp16 and dst_rank == self._rank: + self._generate_master_params(trainable_params) + if trainable_params: + param_storage = ParamStorage( + size=self.rank_buffer_size[dtype][dst_rank], + dtype=dtype, + device=self._default_device) + + param_storage.add_rank_params(trainable_params, + self._param2align) + self.param_storages[dtype][dst_rank] = param_storage + + # Clear the InternalStorage keys which are not in use anymore + dtype_in_use = list(self.dtype_rank_params.keys()) + dtype_to_pop = list( + filter(lambda x: x not in dtype_in_use, self.param_storages.keys())) + for d in dtype_to_pop: + self.param_storages.pop(d) + + if self.offload: + self._optim._master_weights = self._master_params + cpu_master_params = [p for p in self._master_params.values()] + for param in cpu_master_params: + size = param._numel() * align[Type.fp32.value] + remaining = size % alignment[self.offload_device] + ali = 0 if remaining == 0 else alignment[ + self.offload_device] - remaining + align_ = ali // align[Type.fp32.value] + self.offload_buffer_size += param._numel() + align_ + self.offload_param2align[param.name] = align_ + + if cpu_master_params: + with device_guard(self._rank, self.offload_device): + self.offload_params = ParamStorage( + size=self.offload_buffer_size, + dtype=Type.fp32.value, + device=self.offload_device) + self.offload_params.buffer.name = "offload_buffer" + self.offload_params.add_rank_params( + cpu_master_params, self.offload_param2align, False) + self.offload_params.buffer.stop_gradient = False + + self.offload_grads = GradStorage( + size=self.offload_buffer_size, + dtype=Type.fp32.value, + device=self.offload_device, + destination=self._rank, + parm2align=self.offload_param2align, + convert_cpu=True) + for p in cpu_master_params: + self.offload_grads.add_grad( + p, self.offload_param2align[p.name]) + + self._optim._master_weights[ + self.offload_params.buffer. + name] = self.offload_params.buffer + + def _offload_acc_grad(self, param_name, grad_fp32_cpu): + """accumulate grads with offload strategy""" + with device_guard(self._rank, self.offload_device): + if param_name in self._master_params.keys(): + if self._master_params[param_name].grad is None: + self._master_params[param_name]._copy_gradient_from( + grad_fp32_cpu) + else: + self._master_params[param_name].grad.add_(grad_fp32_cpu) + + self.offload_params.buffer._copy_gradient_from( + self.offload_grads.buffer) + + def _offload_scale_grad(self, scale_size): + """scale grads with offload strategy""" + with device_guard(self._rank, self.offload_device): + self.offload_grads.buffer.scale_(scale=scale_size) + + def _offload_clear_grad(self): + """clear grads with offload strategy""" + with device_guard(self._rank, self.offload_device): + self.offload_grads.buffer.zero_() + + def step(self): + """ + A wrapper for Optimizer's step function to finish the update operation of the optimizer. + """ + + if self.offload: + params_list = [self.offload_params.buffer] + + #TODO(Baibaifan): Offload will support param_groups later + if not isinstance(self._optim._param_groups[0], dict): + self._optim._parameter_list = params_list + self._optim._param_groups = params_list + + # Run the optimizer of the current rank step + if self.offload: + with device_guard(device=self.offload_device): + self._optim.step() + + for param in self._local_params: + if param.name in self._master_params.keys(): + param.set_value(self._master_params[param.name].cuda( + self.dev_id).cast(dtype=param.dtype)) + else: + self._optim.step() + + # Synchronize all the updated shards in between the ranks + self._broadcast_params() + + def minimize(self): + raise RuntimeError( + "optimizer.minimize() not support now, please use optimizer.step()") + + def set_state_dict(self, state_dict): + self._optim.set_state_dict(state_dict) + + def state_dict(self): + return self._optim.state_dict() + + def _clear_cache(self): + self.__segment_params.clear() + self._dtype_rank_params.clear() + self._param2rank.clear() + + @paddle.autograd.no_grad() + def _broadcast_params(self): + """Broadcast the parameters of the current rank to each rank""" + + # Exchange all the shards with the other ranks + for dtype_per_rank in self.param_storages.values(): + for dst_rank, internal_storage in dtype_per_rank.items(): + broadcast( + tensor=internal_storage.buffer, + src=self._group.ranks[dst_rank], + group=self._group, + use_calc_stream=True) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py new file mode 100644 index 00000000000..0c045c45fd5 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py @@ -0,0 +1,543 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +import logging +import time +import functools +import numpy as np +from functools import reduce +from collections import deque +from types import MethodType + +import paddle +from paddle import nn +from paddle.distributed import collective +from paddle.distributed.utils import get_logger + +from .group_sharded_storage import GradStorage +from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from .group_sharded_utils import Taskflow, Type, device_guard + +logger_ = get_logger(logging.INFO) + + +def _trainable(param): + return param.trainable + + +class GroupShardedStage2(nn.Layer): + """ + A wrapper for Sharding Stage2 Layer in Dygraph. + .. warning: GroupShardedStage2 encapsulates the layer strategy and integrates it into the nn.Layer. + .. ZeRO: https://arxiv.org/pdf/1910.02054.pdf. + """ + + # TODO (Baibaifan) + # Feature Notes:: + # 1. Unified memory for param and param.grad to InternalStorage. + # 2. Divide param.grad according to rank to centrally apply for and release GPU memory. + # 3. Dynamically adjust training parameters and models. + # 4. Support offload function. + # 5. Support the establishment of independent communication groups. + + def __init__( + self, + layer, + sharding_optimizer, + group=None, + sync_buffers=False, + buffer_max_size=2**23, #8MB + auto_refresh_trainable=True, + device="gpu"): + super().__init__() + + # training options + self._layer = layer + self._sharding_optimizers = [sharding_optimizer] if not isinstance( + sharding_optimizer, list) else sharding_optimizer + assert all( + list( + map(lambda opt: isinstance(opt, GroupShardedOptimizerStage2), + self._sharding_optimizers)) + ), "Please use GroupShardedOptimizerStage2 optimizer" + self._sync_buffers = sync_buffers + self._auto_refresh_trainable = auto_refresh_trainable + + # Communication related attributes + self._group = collective.new_group(collective._get_global_group() + .ranks) if group is None else group + self._world_size_scaling = 1.0 / self._group.nranks + assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1" + self._rank = self._group.rank + self._global_root_rank = self._group.ranks[ + 0] # picking ranks index 0 as the reference + self._default_device = device + + # Global statistical parameters + self._all_params = [] + for optim in self._sharding_optimizers: + self._all_params.extend(list(optim.local_params)) + + self._trainable_params = [] + self._grad_reduced = [] + self._trainable_param2rank = {} + self._trainable_param2align = {} + self._trainable_mask = list(map(_trainable, self._all_params)) + self._param_grads = [] + + # Set grad storage size & Display param sizes and model sizes + model_size = sum([p._numel() for p in self._layer.parameters()]) + assert buffer_max_size >= 0, "buffer_max_size must be GE than 0." + self._buffer_max_size = self._rank_buffer_size(buffer_max_size, + model_size) + self._use_grad_storage = buffer_max_size > 0 + self._grad_storages = {} # {dtype: {rank: GradStorage}} + self._has_grad_storage = [] + self._grad_storage_list = [] + + # Offload + # TODO(haohongxiang): Now it's not be supported for multi-optimizers using Offload strategy + self._offload_optims = list( + filter(lambda optim: optim.offload, self._sharding_optimizers)) + if len(self._offload_optims) > 0: + assert len( + self._sharding_optimizers + ) == 1, "Only support offload strategy for single optimizer" + + self._offload = len(self._offload_optims) > 0 + self._offload_device = "cpu" + + # Set backward pass hooks + self._bw_hooks = [] + + # TODO (Baibaifan) Set tasks flow support asynchronous communicate + # self._tasks_flow = deque() + + # Define optimizer step and clear_grad + self._redefine_opt_step() + self._redefine_opt_clear() + + def forward(self, *inputs, **kwargs): + """ + A wrapper for Sharding Stage2 layer. + - Fresh trainable params or rebuild grad storage + - Sync layer's buffer params + - Clear all flags states + - Forward for origin layers + """ + + # Whether to need to reset trainable parameters + needs_fresh = len(self._bw_hooks) == 0 and self.training + + if self._auto_refresh_trainable: + needs_fresh |= self._detect_train_change() + + # Front hook + self._init_internal_storage(needs_fresh) + + # Sync layer's buffers state + if self._sync_buffers: + self.__sync_buffers() + + # Normal FW on the base model + fw = self._layer(*inputs, **kwargs) + + return fw + + def set_state_dict(self, state_dict, use_structured_name=True): + self._layer.set_state_dict( + state_dict, use_structured_name=use_structured_name) + + def state_dict(self, + destination=None, + include_sublayers=True, + structured_name_prefix=""): + return self._layer.state_dict( + destination=destination, + include_sublayers=include_sublayers, + structured_name_prefix=structured_name_prefix) + + def _clear_gradients(self): + """ + Set zero to the gradient of the optimizer's current rank trainable parameters. + """ + # Release grad storages + for dtype in self._grad_storages.keys(): + if not self._offload and self._rank in self._grad_storages[ + dtype].keys(): + self._grad_storages[dtype][self._rank].buffer.zero_() + + # Release grads of params + for param in self._trainable_params: + if param.name in self._param_grads and param.grad is not None: + param._zero_grads() + + # Release grads of master params with offload strategy + if self._offload: + self._sharding_optimizers[0]._offload_clear_grad() + + def _grad_scale(self): + """ + Before the gradient accumulation, scale the gradient. + """ + # Scale grad storages + for dtype in self._grad_storages.keys(): + if not self._offload and self._rank in self._grad_storages[ + dtype].keys(): + self._grad_storages[dtype][self._rank].buffer.scale_( + scale=self._world_size_scaling) + + # Scale grads of params + for param in self._trainable_params: + if param.name in self._param_grads and param.grad is not None: + param.grad.scale_(scale=self._world_size_scaling) + # param._reset_grad_inplace_version(True) + + # Scale grads of master params with offload strategy + if self._offload: + self._sharding_optimizers[0]._offload_scale_grad( + self._world_size_scaling) + + def _init_internal_storage(self, needs_fresh): + """ + Judge Fresh trainable params or rebuild grad storage. + """ + if needs_fresh: + self._fresh_trainable() + else: + self._build_grad_storages() + + # Clear all flags state + self._clear_counters() + + def to(self, device=None, dtype=None, blocking=True): + """ + Synchronously or asynchronously convert the data type of the layer, the device is not supported now. + """ + assert isinstance(device, str), "Device must be type str" + assert device == self._default_device, "New devices are not supported, because of the optimizer state is not sync" + + self._layer.to(device=device, dtype=dtype, blocking=blocking) + + # Re-build the buckets, hooks, etc.. + self._fresh_trainable() + + def _fresh_trainable(self): + """ Whether to update training parameters. """ + + # Make sure that this is not done while gradients are waiting to be reduced (if no_sync context for instance) + if reduce(lambda x, y: x or y, self._grad_reduced, False): + logging.warning("Grads waiting to be reduced.") + + self._trainable_params = list( + filter(lambda x: x.trainable, self._all_params)) + self._trainable_params.sort(key=lambda x: x._numel()) + + self._trainable_param2rank = {} + for optim in self._sharding_optimizers: + # Need to be wrappered for Sharding Stage2 Optimizer + if len(optim.param_storages.keys()) == 0: + optim._update_opt_status() + + # Get the parameters split by the optimizer according to rank + for per_rank_params in optim.dtype_rank_params.values( + ): # all the params from all ranks + for params in per_rank_params: + for param in filter(lambda x: x.trainable, params): + self._trainable_param2rank[ + param.name] = optim.param2rank[param.name] + self._trainable_param2align[ + param.name] = optim._param2align[param.name] + + # Create grad_storage + self._setup_use_grad_storage() + # setup backward hooks + self._setup_backward_hooks() + + @paddle.autograd.no_grad() + def __sync_buffers(self): + """ + Sync all the param buffers from all ranks (exp: batch norm statistics). + """ + + for buffer in self._layer.buffers(include_sublayers=True): + collective.broadcast( + buffer, + self._global_root_rank, + self._group, + use_calc_stream=True) + + def __getattr__(self, name): + """Forward missing attributes to wrapped layer.""" + try: + return super().__getattr__(name) + except AttributeError: + return getattr(self._layer, name) + + @paddle.autograd.no_grad() + def _clear_counters(self): + """Reset all the grad reduce and call counters.""" + if self.training: + self._grad_reduced = [True for _ in self._trainable_params] + + if self._use_grad_storage: + for grad_storage in self._grad_storage_list: + grad_storage.reset_checked_in() + + def _get_reduce_fn(self, index, param, dst_rank): + """ + There are two ways to reduce gradient. + - 1. Do not use self._use_grad_storage or exceeded buffer_max_size will be reduced separately. + - 2. Use grad_storage Reduce the storage to get the full gradient from different ranks. + """ + + if not self._use_grad_storage or not self._has_grad_storage[index]: + # Direct reduction + @paddle.autograd.no_grad() + def reduce(*_): + # Skip gradient reduction, do not change status information + if self._grad_reduced[index]: + assert param.grad is not None, "Parameter gradient cannot be None" + + # Change reduce information + self._grad_reduced[index] = False + + # Clear the gradient that does not belong to the current rank through the callback function + def cleanup(): + if dst_rank != self._rank: + param.clear_gradient(False) + elif self._offload: + tmp_grad = param.grad.cast( + dtype=Type.fp32.value).cpu() + + self._sharding_optimizers[0]._offload_acc_grad( + param.name, tmp_grad) + del tmp_grad + param.clear_gradient(False) + + # Synchronize the reduce parameter gradient + collective.reduce( + tensor=param.grad, + dst=self._group.ranks[dst_rank], + group=self._group) + # TODO (Baibaifan) Asynchronous the reduce parameter gradient + + # Clear the task flow and trigger callback to clear the redundant gradient + # self._clear_task_flow() + + cleanup() + + else: + # Buffer reduction + @paddle.autograd.no_grad() + def reduce(*_): + # Skip gradient reduction, do not change status information + if self._grad_reduced[index]: + assert param.grad is not None, "Parameter gradient cannot be None" + + # Change reduce information + self._grad_reduced[index] = False + grad_storage = self._grad_storages[param.dtype][dst_rank] + grad_storage.params_checked_in += 1 + + if grad_storage.all_checked_in: + assert grad_storage.buffer is not None + + # Clearing up the grad_storage buffer + def cleanup(): + if dst_rank != self._rank: + for p in grad_storage._params: + p.clear_gradient(False) + + grad_storage.buffer._clear_data() + elif self._offload: + grad_storage.to(device=self._offload_device) + for p in grad_storage._params: + with device_guard(): + tmp_grad = p.grad.cast( + dtype=Type.fp32.value) + self._sharding_optimizers[ + 0]._offload_acc_grad(p.name, tmp_grad) + p.clear_gradient(False) + grad_storage._device = self._default_device + grad_storage.buffer._clear_data() + + # Reduce the bucket + grad_storage.sent = True + # Synchronize the reduce parameter gradient + collective.reduce( + tensor=grad_storage.buffer, + dst=self._group.ranks[grad_storage.destination], + group=self._group) + # TODO (Baibaifan) Asynchronous the reduce parameter gradient + + cleanup() + + # Clear the task flow and trigger callback to clear the redundant gradient + # self._clear_task_flow() + + return reduce + + def _setup_backward_hooks(self): + """ + Set the backward hook to synchronize the gradients of all rank by reduce group ranks. + """ + + # Remove previous backward hooks + while len(self._bw_hooks) > 0: + self._bw_hooks.pop().remove() + + # Go through the parameters, attach the hook + if not self.training: + return + + for index, param in enumerate(self._trainable_params): + dst_rank = self._trainable_param2rank[param.name] + + reduce_function = self._get_reduce_fn(index, param, dst_rank) + + self._bw_hooks.append( + param._register_backward_hook(reduce_function)) + + def _setup_use_grad_storage(self): + """ + Integrate the parameters gradient into a continuous memory according to rank, and support the update of training parameters. + """ + + # According to parameters's numel sort, allocate memory of parameter gradient to continuous memory according to rank + self._grad_storages = {} + self._has_grad_storage = [False for _ in self._trainable_params] + + for index, param in enumerate(self._trainable_params): + dst_rank = self._trainable_param2rank[param.name] + + if param.dtype not in self._grad_storages.keys(): + self._grad_storages[param.dtype] = {} + + if dst_rank not in self._grad_storages[param.dtype].keys(): + self._grad_storages[param.dtype][dst_rank] = GradStorage( + self._buffer_max_size[param.dtype], + dtype=param.dtype, + device=self._default_device, + destination=dst_rank, + parm2align=self._trainable_param2align) + + # Criteria to decide whether this parameter is to be put in GradStorage + if self._grad_storages[param.dtype][dst_rank].can_add_grad_view( + param, self._trainable_param2align[param.name]): + self._grad_storages[param.dtype][dst_rank].add_grad( + param, self._trainable_param2align[param.name]) + self._has_grad_storage[index] = True + else: + self._param_grads.append(param.name) + print( + "Can not add param: {}, param's shape: {}, param align: {}, grad_storages fill: {}, ". + format(param.name, param.shape, self._trainable_param2align[ + param.name], self._grad_storages[param.dtype][dst_rank] + ._fill)) + + for dtype in self._grad_storages.keys(): + self._grad_storage_list.extend( + list(self._grad_storages[dtype].values())) + + # def _clear_task_flow(self): + # """Try to consume the previous tasks.""" + # while len(self._tasks_flow) > 0: + # task = self._tasks_flow.popleft() + # task.wait() + # if task.callback is not None: + # task.callback() + + def _detect_train_change(self): + # Current trainable parameters + trainable_mask = list(map(_trainable, self._all_params)) + + # Whether parameters trainability changed + trainability_changed = trainable_mask != self._trainable_mask + + if trainability_changed: + logging.warning( + "Trainable params changed, because of eval/train mode or parameter freezing/unfreeze." + ) + self._trainable_mask = trainable_mask + + return trainability_changed + + def _build_grad_storages(self): + """ + Rebuild grad storages. + """ + # Rebuild fp16/fp32 grad storages + for dtype in self._grad_storages.keys(): + for dst_rank, grad_storage in self._grad_storages[dtype].items(): + if self._offload or dst_rank != self._rank: + grad_storage.manumal_relase() + grad_storage.rebuild() + + def _rank_buffer_size(self, buffer_max_size, model_size): + """ + Generate the minimum buffer size for each rank & Display param sizes and model sizes. + """ + + # Initialize buffer size + rank_buffer_size = {} + for shard_opt in self._sharding_optimizers: + if shard_opt.rank_buffer_size: + for dtype in shard_opt.rank_buffer_size.keys(): + sizes = max(shard_opt.rank_buffer_size[dtype].values()) + rank_buffer_size[dtype] = min(sizes, buffer_max_size) + + if Type.fp16.value in rank_buffer_size.keys(): + # FP16 GradStorage and model size + logger_.info( + "====== FP16 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======". + format(rank_buffer_size[Type.fp16.value] / 2**19, model_size / 2 + **19)) + if Type.fp32.value in rank_buffer_size.keys(): + # FP32 GradStorage and model size + logger_.info( + "====== FP32 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======". + format(rank_buffer_size[Type.fp32.value] / 2**18, model_size / 2 + **18)) + return rank_buffer_size + + def _redefine_opt_step(self): + grad_func = self._grad_scale + for opt in self._sharding_optimizers: + opt_step = opt.step + + def _opt_step(self): + grad_func() + opt_step() + + opt.step = MethodType(_opt_step, opt) + + def _redefine_opt_clear(self): + clear_func = self._clear_gradients + + def _opt_clear(self): + clear_func() + + for opt in self._sharding_optimizers: + opt.clear_grad = MethodType(_opt_clear, opt) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py new file mode 100644 index 00000000000..049d3ffa369 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py @@ -0,0 +1,912 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import numpy as np +from types import MethodType +from collections import OrderedDict + +import paddle +from paddle import nn +from paddle.autograd import EagerPyLayer +import paddle.fluid.core as core +import paddle.fluid.framework as framework +from paddle.fluid.framework import EagerParamBase +from paddle.fluid.clip import ClipGradByGlobalNorm +from paddle.distributed import collective + +from .group_sharded_storage import GradStorage +from .group_sharded_utils import Type, GroupShardedClipGrad, device_guard + + +def _all_gather(tensor, buffer_size, group): + """ + The main difference with paddle.distributed.all_gather: + no need to pass in tensor_list, the returned tensor is spliced + """ + + assert group is not None + if framework.in_dygraph_mode(): + out = paddle.zeros([buffer_size], dtype=tensor.dtype) + task = group.process_group.all_gather(tensor, out) + return out, task + + +# CUDA alignment 256 bytes +alignment = {"gpu": 256, } +align = { + Type.fp16.value: 2, + Type.fp32.value: 4, +} + +global CHECK_LAYER +CHECK_LAYER = dict() # Help to check layer's id -> layer's name + + +class GroupShardedStage3(nn.Layer): + """ + A wrapper for Sharding Stage3 Layer in Dygraph. + + .. warning: GroupShardedStage3 encapsulates the layer strategy and integrates it into the nn.Layer. + + .. ZeRO: https://arxiv.org/pdf/1910.02054.pdf. + """ + + # TODO (Baibaifan) + # Feature Notes:: + # 1. The model supports the segmentation of parameters by global ranks in layers. + # 2. Support communication flow and computing flow. + # 3. Support offload function. + # 4. Support the establishment of independent communication groups. + + def __init__(self, + layer, + optimizer, + group=None, + sync_buffers=False, + device="gpu", + segment_size=2**20, + pertrain_sync_models=True, + offload=False, + sync_comm=False): + super().__init__() + + # Default configs + assert core.is_compiled_with_cuda(), "Only support CUDA." + self._layer = layer + self._default_device = device + self.__sync_buffers = sync_buffers + self._offload = offload + self._sync_comm = sync_comm + # segmentation size + assert segment_size >= 0, "segment_size must be GE than 0." + self._segment_size = segment_size + + global DEV + DEV = "cpu" if paddle.get_device() == "cpu" else paddle.get_device( + ).split(":")[0] + global DEV_ID + DEV_ID = 0 if paddle.get_device() == "cpu" else int(paddle.get_device() + .split(":")[1]) + global param2dtype + param2dtype = dict() + + # Communication group establishment + self._group = collective.new_group(collective._get_global_group() + .ranks) if group is None else group + self._world_size_scaling = 1.0 / self._group.nranks + assert self._group.nranks > 1, "Training must be distributed, ranks must be greater than 1." + self._rank = self._group.rank + self._global_root_rank = self._group.ranks[ + 0] # picking ranks index 0 as the reference + + # Parameter segmentation for global ranks + # After flatten -> self._param2buffer_size, self._param2buffer, self._trainable_params + self._param2buffer_size = dict() # {param.name: size} + self._param2buffer = dict( + ) # {param.name: [(start0, end0),(start1, end1), ...]} + self._trainable_params = dict() # {id(layer): [trainable_params]} + self._unslice_params = set() # param's numel <= segment_size + self._unslice_params2align = dict() # {param.name: param's align} + self._grad_storages = dict() # {param.dtype: GradStorage} + + assert not isinstance( + optimizer, list), "Multiple optimizers are not supported now." + self._optim = _OptimizerWrapper(optimizer, self._offload, self._group, + self._update_params_slice) + self._ori_parameter_list = self._optim._parameter_list + self._ori_param_groups = self._optim._param_groups + + # Replace optimizer's _grad_clip + if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm): + logging.warning( + "While using ClipGradByGlobalNorm in GroupShardedStage3, the grad clip of original optimizer will be changed." + ) + self._optim._grad_clip = GroupShardedClipGrad( + self._optim._grad_clip, paddle.get_device(), self._group) + if self._optim._parameter_list and isinstance( + self._optim._parameter_list[0], dict): + for item in self._optim._param_groups: + if "grad_clip" in item.keys(): + item["grad_clip"] = self._optim._grad_clip + + # Synchronous all ranks models + if pertrain_sync_models: + self._sync_params_and_buffers() + + self._segment_rank_params(self._layer) + + # Add unslice params to master_weight in fp16 + self._handle_unslice_params() + + # In the first step, record the execution order of the layer + self._order_tracer = OrderedDict() + self._order_tracer["order"] = 0 + self._order_tracer["layer"] = list() + + # Register task flow + self._task_flow = TaskFlow() + + # Register forward hooks + self._register_forward_hooks(self._layer) + + # Register backward parameter hooks + self._register_backward_hooks() + + # Redefine optimizer step and clear function + self._redefine_opt_step() + self._redefine_opt_clear() + + @paddle.autograd.no_grad() + def _sync_params_and_buffers(self): + """ + Sync all model states for all ranks + """ + + for p in self._layer.parameters(): + collective.broadcast( + p, + src=self._global_root_rank, + group=self._group, + use_calc_stream=True) + + def _clear_gradients(self): + assert len(self._trainable_params.keys()) > 0 + current_layer_params = self._layer.parameters(include_sublayers=True) + # 1.Handle param's slice + trainable_params = list( + filter(lambda p: p.trainable and p not in self._unslice_params, + current_layer_params)) + for param in trainable_params: + assert hasattr( + param, "fw_storage" + ), "Find {} don't have fw_storage attribute.".format(param.name) + + param.fw_storage.clear_gradient(False) + param.bw_storage._clear() + param.bw_storage = None + # 2.Handle unslice param + if not self._offload: + for grad_storage in self._grad_storages.values(): + grad_storage.buffer.zero_() + else: + for param in list(self._unslice_params): + param.clear_gradient(False) + tmp_var = param.cuda(DEV_ID) + param._clear_data() + if tmp_var.dtype == Type.fp32.value and param2dtype[ + param.name] == Type.fp16.value: + tmp_var = paddle.cast(tmp_var, Type.fp16.value) + tmp_var._share_buffer_to(param) + del tmp_var + for grad_storage in self._grad_storages.values(): + grad_storage.manumal_relase() + grad_storage.rebuild() + + # Update param memery slice + def _update_params_slice(self): + update_list = self._update_params() + + if not isinstance(self._optim._param_groups[0], dict): + slice_params = [param.fw_storage for param in update_list] + self._optim._parameter_list = slice_params + list( + self._unslice_params) + self._optim._param_groups = slice_params + list( + self._unslice_params) + else: + for param_group in self._optim._param_groups: + p_group = [] + for p in param_group['params']: + if hasattr(p, "fw_storage"): + p_group.append(p.fw_storage) + else: + p_group.append(p) + + param_group['params'] = p_group + + def forward(self, *inputs, **kwargs): + """ + A wrapper for Sharding Stage3 layer. + """ + # 1.Sync layer's buffers state + if self.__sync_buffers: + self._sync_buffers() + + # 2.Normal FW on the base model + fw = self._layer(*inputs, **kwargs) + + return fw + + def set_state_dict(self, state_dict, use_structured_name=True): + self._layer.set_state_dict( + state_dict, use_structured_name=use_structured_name) + + def state_dict(self, + destination=None, + include_sublayers=True, + structured_name_prefix=""): + return self._layer.state_dict( + destination=destination, + include_sublayers=include_sublayers, + structured_name_prefix=structured_name_prefix) + + def _handle_unslice_params(self): + buffer_size = dict() + buffer_size[Type.fp32.value] = 0 + buffer_size[Type.fp16.value] = 0 + for param in self._unslice_params: + # Updata optimizer master weights + if param.dtype == Type.fp16.value and not self._offload: + master_tensor = paddle.cast(param, Type.fp32.value) + master_tensor.name = param.name + self._optim._master_weights[param.name] = master_tensor + param2dtype[param.name] = param.dtype + p_align = self._param2align(param) + self._unslice_params2align[param.name] = p_align + buffer_size[param.dtype] += param._numel() + p_align + + # Create unslice_params'grad + for param in sorted(list(self._unslice_params), key=lambda p: p.name): + if param.dtype not in self._grad_storages.keys(): + self._grad_storages[param.dtype] = GradStorage( + buffer_size[param.dtype], + dtype=param.dtype, + device=self._default_device, + destination=self._rank, + parm2align=self._unslice_params2align) + self._grad_storages[param.dtype].add_grad( + param, self._unslice_params2align[param.name]) + + def _segment_rank_params(self, layer, name="last_layer"): + """ + Flatten parameters according to layer. + """ + current_layer_params = _current_layer_params(layer) + if current_layer_params: + CHECK_LAYER[id(layer)] = name + self._flatten_layer_params(layer, current_layer_params) + + for name, sub_layer in layer.named_children(): + self._segment_rank_params(sub_layer, name) + + def _flatten_layer_params(self, layer, current_layer_params): + """ + Parameter segmentation and memory integration. + """ + + def _add_manage_info(trainable_param): + return _PartitionParam(trainable_param) + + current_params = list() + for p in current_layer_params: + if p.trainable and p._numel() > self._segment_size: + current_params.append(_add_manage_info(p)) + elif p.trainable: + self._unslice_params.add(_UnsliceParam(p)) + + assert id(layer) not in self._trainable_params.keys() + self._trainable_params[id(layer)] = current_params + + for param in self._trainable_params[id(layer)]: + if param.name in self._param2buffer.keys(): + continue + self._param2buffer[param.name] = [] + # 1.Params alignment + align_ = self._param2align(param) + + offset = align_ + param._numel() + buffer_size = offset if offset % self._group.nranks == 0 else offset + self._group.nranks - ( + offset % self._group.nranks) + self._param2buffer_size[param.name] = buffer_size + + # 2.Combination param buffer + assert buffer_size % self._group.nranks == 0 + pre_buffer = buffer_size // self._group.nranks + + for rank_ in range(self._group.nranks): + self._param2buffer[param.name].append( + (rank_ * pre_buffer, (rank_ + 1) * pre_buffer)) + + # Record param's dtype + param2dtype[param.name] = param.dtype + # 3.Flatten layer params and release other rank buffer + self._param_storage(param, buffer_size) + + def _param_storage(self, param, buffer_size): + """ + This is a function to simplify the handling of parameter InternalStorages. + """ + assert isinstance(buffer_size, int) + value = np.zeros( + buffer_size, + dtype=np.float16) if Type.fp16.value == param.dtype else np.zeros( + buffer_size, dtype=np.float32) + buffer = core.eager.Tensor(value=value, place=core.CPUPlace()) + + param_shape = param.shape + origin_state = param.stop_gradient + param.stop_gradient = True + param.flatten_() + param.stop_gradient = origin_state + start, end = self._param2buffer[param.name][self._rank] + + # Copy the current param value + with device_guard(): + tmp_var = buffer._slice(0, param._numel()) + param_cpu = param.cpu() + tmp_var.get_tensor().set(param_cpu.get_tensor(), core.CPUPlace()) + del tmp_var + param.get_tensor()._set_dims(param_shape) + param._clear_data() + + # Current rank param_storage + if self._offload: + with device_guard(): + tmp_tensor = buffer._slice(start, end) + param.fw_storage = core.eager.Tensor( + value=tmp_tensor, + place=core.CPUPlace(), + name="slice@" + param.name) + else: + param.fw_storage = core.eager.Tensor( + value=buffer._slice(start, end), name="slice@" + param.name) + param.status = "part" + + # Updata optimizer master weights + if param.dtype == Type.fp16.value and not self._offload: + master_tensor = paddle.cast(param.fw_storage, Type.fp32.value) + master_tensor.name = param.name + self._optim._master_weights[param.fw_storage.name] = master_tensor + + def _register_forward_hooks(self, layer): + """ + Register EagerPyLayer to manage memory slices. + There are four stages: + FW + 1. Before the forward layers, synchronize the full parameters. + 2. After the forward layers, release the full parameter and keep the parameter slice. + BW + 3. Before the backward layers, synchronize the full parameters and create param's grad. + 4. After the gradient accumulation, release the full parameter and keep the parameter slice. + """ + current_layer_params = _current_layer_params(layer) + if current_layer_params: + self._register_forward_all_hooks(layer, self._task_flow) + + for _, sub_layer in layer.named_children(): + self._register_forward_hooks(sub_layer) + + def _register_forward_all_hooks(self, sub_layer, task_flow): + def _forward_pre_hook(layer, inputs): + return ForwardPreHooks(layer, self._order_tracer, + self._trainable_params, + self._param2buffer_size, self._group, + self._sync_comm, self._offload, task_flow) + + def _forward_post_hook(layer, inputs, outputs): + return ForwardPostHooks.apply( + outputs, layer, self._order_tracer, self._trainable_params, + self._param2buffer, self._param2buffer_size, self._rank, + self._group, self._sync_comm, self._offload, task_flow) + + # register previous forward hooks + sub_layer.register_forward_pre_hook(_forward_pre_hook) + + # register post forward hooks + sub_layer.register_forward_post_hook(_forward_post_hook) + + @paddle.autograd.no_grad() + def _sync_buffers(self): + """ + Sync all the param buffers from all ranks (exp: batch norm statistics). + """ + + for buffer in self._layer.buffers(include_sublayers=True): + collective.broadcast( + buffer, + self._global_root_rank, + self._group, + use_calc_stream=True) + + def __getattr__(self, name): + """Forward missing attributes to wrapped layer.""" + try: + return super().__getattr__(name) + except AttributeError: + return getattr(self._layer, name) + + def _update_params(self): + """ + Update parameters to optimizer memory slice. + """ + update_list = [] + assert len(self._trainable_params.keys()) > 0 + current_layer_params = self._layer.parameters(include_sublayers=True) + trainable_params = list( + filter(lambda p: p.trainable and p not in self._unslice_params, + current_layer_params)) + # 1.Handle param's slice + for param in trainable_params: + assert hasattr( + param, + "fw_storage"), "Find {} don't have fw_storage attribute".format( + param.name) + # Gradient average + if self._offload: + with device_guard(): + param.bw_storage.scale_(scale=self._world_size_scaling) + else: + param.bw_storage.scale_(scale=self._world_size_scaling) + param.fw_storage = _VarBaseWrapper(param) + assert param.fw_storage.grad is None + param.fw_storage._copy_gradient_from(param.bw_storage) + update_list.append(param) + + # 2.Handle unslice param + for grad_storage in self._grad_storages.values(): + grad_storage.buffer.scale_(scale=self._world_size_scaling) + collective.all_reduce(tensor=grad_storage.buffer, group=self._group) + if self._offload: + for param in list(self._unslice_params): + tmp_var = _device2cpu(param, convert_dtype=True) + tmp_var._share_buffer_to(param) + del tmp_var + + for grad_storage in self._grad_storages.values(): + for p in grad_storage._params: + tmp_g = _device2cpu(p.grad, convert_dtype=True) + p.clear_gradient(False) + p._copy_gradient_from(tmp_g) + del tmp_g + grad_storage.buffer._clear() + + return update_list + + def get_all_parameters(self, convert2cpu=False): + """ + Get the full parameters and return the corresponding task flows. + """ + assert len(self._trainable_params.keys()) > 0 + current_layer_params = self._layer.parameters(include_sublayers=True) + trainable_params = list( + filter(lambda p: p.trainable and p not in self._unslice_params, + current_layer_params)) + t_flow = _allgather_buffer( + trainable_params, + self._group, + param2buffer_size=self._param2buffer_size, + use_calc_stream=True, + task_flow=TaskFlow(), + sync_wait=True, + offload=self._offload, + convert2cpu=convert2cpu) + if convert2cpu: + for param in trainable_params: + t_flow.full_param[param.name][0]._share_buffer_to(param) + + self._optim._parameter_list = self._ori_parameter_list + self._optim._param_groups = self._ori_param_groups + + def _register_backward_hooks(self): + current_layer_params = self._layer.parameters(include_sublayers=True) + trainable_params = list( + filter(lambda p: p.trainable and p not in self._unslice_params, + current_layer_params)) + + for param in trainable_params: + allreduce_function = self._get_allreduce_fn(param) + param._register_backward_hook(allreduce_function) + + def _get_allreduce_fn(self, param): + @paddle.autograd.no_grad() + def allreduce_(*_): + if param.name in self._task_flow.full_grad.keys(): + full_grad = self._task_flow.full_grad[param.name] + # Only support sync allreduce current rank's layer now + collective.all_reduce(tensor=full_grad, group=self._group) + + start, end = self._param2buffer[param.name][self._rank] + if param.bw_storage is None: + param.bw_storage = full_grad._slice(start, + end).detach().clone() + if self._offload: + param.bw_storage = _device2cpu(param.bw_storage, True) + else: + if self._offload: + cpu_grad = _device2cpu( + full_grad._slice(start, end).detach().clone(), True) + with device_guard(): + param.bw_storage = paddle.add(param.bw_storage, + cpu_grad) + else: + param.bw_storage = paddle.add( + param.bw_storage, + full_grad._slice(start, end).detach().clone()) + param.clear_gradient(False) + del self._task_flow.full_grad[param.name] + + if param.name in self._task_flow.full_param.keys(): + if param.status == "all": + param.use_count = 0 + param._clear_data() + start, end = self._param2buffer[param.name][self._rank] + param.fw_storage = self._task_flow.full_param[param.name][ + 0]._slice(start, end).detach().clone() + param.status = "part" + del self._task_flow.full_param[param.name] + + if self._offload: + param.fw_storage = _device2cpu(param.fw_storage, True) + + return allreduce_ + + def _param2align(self, param): + # CUDA alignment 256 bytes + size = param._numel() * align[param.dtype] + remaining = size % alignment[self._default_device] + ali = 0 if remaining == 0 else alignment[ + self._default_device] - remaining + align_ = ali // align[param.dtype] + return align_ + + def _redefine_opt_step(self): + params_slice_func = self._update_params_slice + opt_step = self._optim.step + + def _opt_step(self): + if not self.update_scaler: + params_slice_func() + if self.offload: + with device_guard(): + opt_step() + else: + opt_step() + + def _opt_minimize(self): + raise RuntimeError( + "optimizer.minimize() not support now, please use optimizer.step()" + ) + + self._optim.step = MethodType(_opt_step, self._optim) + self._optim.minimize = MethodType(_opt_minimize, self._optim) + + def _redefine_opt_clear(self): + clear_func = self._clear_gradients + + def _opt_clear(self): + clear_func() + + self._optim.clear_grad = MethodType(_opt_clear, self._optim) + + +def ForwardPreHooks(layer, order_tracer, trainable_params, param2buffer_size, + group, sync_comm, offload, task_flow): + + # Record layer's id + layer_id = id(layer) + use_calc, sync_wait = False, False + + if layer_id not in order_tracer.keys() or sync_comm: + use_calc, sync_wait = True, True + + # Whether to use calc stream + task_flow.use_calc[layer_id] = use_calc + else: + # Whether to use calc stream + task_flow.use_calc[layer_id] = use_calc + # wait current layer params + _wait_layer(trainable_params[layer_id], task_flow, group, + param2buffer_size, use_calc, offload) + + if layer_id == order_tracer["layer"][-1]: return + order_ = order_tracer[layer_id] + layer_id = order_tracer["layer"][order_ + 1] + + _allgather_buffer( + trainable_params[layer_id], + group, + param2buffer_size=param2buffer_size, + use_calc_stream=use_calc, + task_flow=task_flow, + sync_wait=sync_wait, + offload=offload) + + return + + +class ForwardPostHooks(EagerPyLayer): + @staticmethod + def forward(ctx, inputs, layer, order_tracer, trainable_params, + param2buffer, param2buffer_size, rank, group, sync_comm, + offload, task_flow): + + layer_id = id(layer) + # release current layer full params + _release_param(trainable_params[layer_id], param2buffer, rank, + task_flow, offload) + + if layer_id not in order_tracer.keys(): + order_ = order_tracer["order"] + order_tracer[layer_id] = order_ + order_tracer["order"] += 1 + order_tracer["layer"].append(layer_id) + + #Record fw info + ctx.order_tracer = order_tracer + ctx.task_flow = task_flow + ctx.group = group + ctx.layer_id = layer_id + ctx.sync_comm = sync_comm + ctx.trainable_params = trainable_params + ctx.param2buffer_size = param2buffer_size + ctx.offload = offload + + return inputs + + @staticmethod + def backward(ctx, *args): + # Load context value + order_tracer = ctx.order_tracer + task_flow = ctx.task_flow + group = ctx.group + layer_id = ctx.layer_id + trainable_params = ctx.trainable_params + param2buffer_size = ctx.param2buffer_size + sync_comm = ctx.sync_comm + offload = ctx.offload + use_calc, sync_wait = False, False + + # Allgather params synchronization + if sync_comm: + use_calc, sync_wait = True, True + _allgather_buffer( + trainable_params[layer_id], + group, + param2buffer_size=param2buffer_size, + use_calc_stream=use_calc, + task_flow=task_flow, + sync_wait=sync_wait, + offload=offload) + else: + _wait_layer(trainable_params[layer_id], task_flow, group, + param2buffer_size, use_calc, offload) + + # Create params's grad + _create_params_grad(trainable_params[layer_id], param2buffer_size, + task_flow) + + # Whether to use calc stream + task_flow.use_calc[layer_id] = use_calc + if layer_id != order_tracer["layer"][0] and not sync_comm: + layer_next_id = order_tracer["layer"][order_tracer[layer_id] - 1] + _allgather_buffer( + trainable_params[layer_next_id], + group, + param2buffer_size=param2buffer_size, + use_calc_stream=use_calc, + task_flow=task_flow, + sync_wait=sync_wait, + offload=offload) + + return args + + +class TaskFlow: + """ + Task flows, one way linked list for task acquisition. + """ + + def __init__(self, + full_param=dict(), + full_grad=dict(), + use_calc=dict(), + callback=None): + self.full_param = full_param + self.full_grad = full_grad + self.use_calc = use_calc + self.callback = callback + + +def _release_param(trainable_params, + param2buffer, + rank, + task_flow, + offload=False): + for param in trainable_params: + # async communicate share weight not clear + param.use_count -= 1 + if param.use_count == 0: + param._clear_data() + if param.name in task_flow.full_param.keys(): + start, end = param2buffer[param.name][rank] + with paddle.amp.auto_cast(enable=False): + param.fw_storage = task_flow.full_param[param.name][ + 0]._slice(start, end).detach().clone() + param.status = "part" + del task_flow.full_param[param.name] + + if offload: + param.fw_storage = _device2cpu(param.fw_storage) + return + + +def _wait_layer(trainable_params, + task_flow, + group, + param2buffer_size, + use_calc_stream, + offload=False): + + for param in trainable_params: + if param.status == "all": + param.use_count += 1 + continue + if param.name in task_flow.full_param.keys(): + full_param, task = task_flow.full_param[param.name] + task.wait() + full_param._slice(0, param._numel())._share_buffer_to(param) + param.fw_storage._clear() + param.fw_storage = None + param.status = "all" + param.use_count += 1 + else: + _allgather_buffer( + trainable_params, + group, + param2buffer_size=param2buffer_size, + use_calc_stream=True, + task_flow=task_flow, + sync_wait=True, + offload=offload) + break + return task_flow + + +def _allgather_buffer(trainable_params, + group, + param2buffer_size, + use_calc_stream, + task_flow, + sync_wait=False, + offload=False, + convert2cpu=False): + + for param in trainable_params: + if param.status == "all": + param.use_count += 1 + continue + + if offload: + param.fw_storage = _cpu2device(param) + + buffer_size = param2buffer_size[param.name] + with paddle.amp.auto_cast(enable=False): + full_param, task = _all_gather(param.fw_storage, buffer_size, group) + + # Allgather current layer in the 1st step synchronously + if sync_wait: + with paddle.amp.auto_cast(enable=False): + task.wait() + full_param._slice(0, param._numel())._share_buffer_to(param) + param.fw_storage._clear() + param.fw_storage = None + param.status = "all" + param.use_count += 1 + task_flow.full_param[param.name] = (full_param, task) + + # parameter converts to cpu + if convert2cpu: + p_name = param.name + param = _device2cpu(param) + del task_flow.full_param[p_name] + task_flow.full_param[p_name] = (param, None) + + return task_flow + + +@paddle.autograd.no_grad() +def _create_params_grad(trainable_params, param2buffer_size, task_flow): + for param in trainable_params: + if param.name in task_flow.full_grad.keys(): + continue + assert isinstance(param2buffer_size[param.name], int) + temp_grad = paddle.zeros( + [param2buffer_size[param.name]], dtype=param.dtype) + temp_tensor = temp_grad._slice(0, param._numel()) + temp_tensor.get_tensor()._set_dims(param.shape) + param._copy_gradient_from(temp_tensor) + del temp_tensor + task_flow.full_grad[param.name] = temp_grad + return task_flow + + +def _PartitionParam(param): + if not hasattr(param, "fw_storage"): + setattr(param, "fw_storage", None) + setattr(param, "bw_storage", None) + setattr(param, "status", "all") + setattr(param, "use_count", 0) + return param + + +def _UnsliceParam(param): + if not hasattr(param, "unslice"): + setattr(param, "unslice", True) + return param + + +def _VarBaseWrapper(param): + varbase = param.fw_storage + tmp_param = EagerParamBase( + shape=varbase.shape, dtype=varbase.dtype, name="slice@" + param.name) + varbase._share_buffer_to(tmp_param) + tmp_param.regularizer = param.regularizer + tmp_param.optimize_attr['learning_rate'] = param.optimize_attr[ + 'learning_rate'] + varbase._clear() + return tmp_param + + +def _OptimizerWrapper(optimizer, offload, group, update_params_slice): + if not hasattr(optimizer, "_optim"): + setattr(optimizer, "_optim", optimizer) + setattr(optimizer, "offload", offload) + setattr(optimizer, "_group", group) + setattr(optimizer, "update_scaler", None) + setattr(optimizer, "update_slice", update_params_slice) + return optimizer + + +def _device2cpu(trans_param, convert_dtype=False): + if convert_dtype: + trans_param = paddle.cast(trans_param, Type.fp32.value) + tmp_p = trans_param.cpu() + trans_param._clear_data() + return tmp_p + + +def _cpu2device(param): + tmp_p = param.fw_storage.cuda(DEV_ID) + if tmp_p.dtype == Type.fp32.value and param2dtype[ + param.name] == Type.fp16.value: + tmp_p = paddle.cast(tmp_p, Type.fp16.value) + return tmp_p + + +def _current_layer_params(layer): + return layer.parameters( + include_sublayers=False) + list(layer.extra_parameters) if hasattr( + layer, "extra_parameters") else layer.parameters( + include_sublayers=False) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_storage.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_storage.py new file mode 100644 index 00000000000..4d706870a91 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_storage.py @@ -0,0 +1,320 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +import os +import time +import numpy as np + +import paddle +from paddle.fluid import core +from .group_sharded_utils import Type, device_guard + + +class InternalStorage: + """ + This is a basic class, which is responsible for consolidating the basic storage tensor. + + """ + + # Support integration parameter tensor + def __init__(self, size, dtype, device, convert_cpu=False): + self._params = [] + self._param_ids = [] + self._fill = 0 + self._device = device + self._dtype = dtype + + # The flatten tensor + size = [size] if isinstance(size, int) else size + if convert_cpu: + value = np.zeros( + size, + dtype=np.float16) if Type.fp16.value == dtype else np.zeros( + size, dtype=np.float32) + self.buffer = core.eager.Tensor(value=value, place=core.CPUPlace()) + else: + self.buffer = paddle.zeros(size, dtype=dtype) + + self.dev_id = 0 if paddle.get_device() == "cpu" else int( + paddle.get_device().split(":")[1]) + + def to(self, device, dtype=None, keep_alignment=True): + """ + Move the underlying buffer + """ + assert self.buffer is not None, "Cannot move a collapsed bucket, please rebuild it" + assert (dtype == Type.fp32.value or + Type.fp16.value), "Conversion type is not supported now" + + if self._device != device: + tmp_buffer = self.buffer.cuda( + self.dev_id) if device == "gpu" else self.buffer.cpu() + for param in self._params: + param.clear_gradient(False) + + del self.buffer + self.buffer = tmp_buffer + self._device = device + + if dtype is not None: + self.buffer = self.buffer.cast(dtype=dtype) + self._dtype = dtype + + +class ParamStorage(InternalStorage): + """ + This is a basic class to simplify the handling of parameter InternalStorages. + """ + + def __init__(self, size, dtype, device): + super().__init__(size, dtype, device, convert_cpu=True) + self.param2align = None + + def to(self, device, dtype=None, keep_alignment=True): + """ + Move the underlying buffer + """ + + super().to(device, dtype) + + if keep_alignment: + self._array_params() + + @paddle.autograd.no_grad() + def add_rank_params(self, trainable_params, param2align, convert_gpu=True): + """ + Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer. + """ + + assert all([ + id(param) not in self._param_ids for param in trainable_params + ]), "The same param cannot be checked in twice" + assert self.buffer is not None + + self.param2align = param2align + + cpu_param_shape = list() + for param in trainable_params: + p_shape = self._add_param_as_view(param, param2align[param.name], + convert_gpu) + cpu_param_shape.append(p_shape) + + if convert_gpu: + # buffer convert from cpu to cuda + self.buffer = self.buffer.cuda(self.dev_id) + + self._fill = 0 + + for idx, param in enumerate(trainable_params): + self._convert_buffer(param, cpu_param_shape[idx], + param2align[param.name]) + self._params.append(param) + self._param_ids.append(id(param)) + + @paddle.autograd.no_grad() + def _add_param_as_view(self, param, align, convert_gpu=True): + + assert ( + param.dtype == self.buffer.dtype + ), "Different types for the InternalStorage and the param, cannot proceed: {} - {}".format( + param.dtype, self.buffer.dtype) + + var_end = self._fill + param._numel() + offset = var_end + align + assert offset <= self.buffer._numel() + + p_shape = param.shape + + origin_state = param.stop_gradient + param.stop_gradient = True + param.flatten_() + param.stop_gradient = origin_state + + # Copy the current param value + + with device_guard(self.dev_id, "cpu"): + tmp_var = self.buffer._slice(self._fill, var_end) + if convert_gpu: + param_cpu = param.cpu() + param._clear_data() + tmp_var.set_value(param_cpu) + else: + tmp_var.set_value(param) + del tmp_var + + self._fill = offset + return p_shape + + @paddle.autograd.no_grad() + def _convert_buffer(self, param, p_shape, align): + + var_end = self._fill + np.prod(p_shape).tolist() + offset = var_end + align + assert offset <= self.buffer._numel() + + # Convert the param value + with device_guard(self.dev_id, self._device): + tmp_tensor = self.buffer._slice(self._fill, var_end) + tmp_tensor._share_buffer_to(param) + param.get_tensor()._set_dims(p_shape) + + self._fill = offset + + @paddle.autograd.no_grad() + def _array_params(self): + """ + Given the parameters which have been registered previously, rebuild the whole InternalStorage. + """ + assert len(self._params) > 0 + assert self.param2align is not None + + self._fill = 0 + for p in self._params: + self._convert_buffer(p, p.shape, self.param2align[p.name]) # modify + + +class GradStorage(InternalStorage): + """ + This is a basic class to simplify the handling of gradient InternalStorages + """ + + def __init__(self, + size, + dtype, + device, + destination, + parm2align, + convert_cpu=False): + if isinstance(size, np.int64): + size = size.tolist() + super().__init__(size, dtype, device, convert_cpu) + + self._max_size = size + self._release = False + + self.params_checked_in = 0 + self.destination = destination + self._parm2align = parm2align + self.sent = False + + def reset_checked_in(self): + """ Reset the counter of the parameter grads which have been checked in + """ + self.params_checked_in = 0 + self.sent = False + + @property + def all_checked_in(self): + """ Judge all the expected gradient check-in happened """ + return len(self._params) == self.params_checked_in + + def can_add_grad_view(self, param, align): + """ Is there enough InternalStorage to add this parameter gradient, and whether this param have already checked in. + """ + return self._fill + param._numel() + align <= self._max_size and id( + param) not in self._param_ids + + def to(self, device, dtype=None, keep_alignment=True): + """ + Move the underlying buffer + """ + if self._release: + self.rebuild() + + super().to(device, dtype) + + if keep_alignment: + self._array_grads() + + @paddle.autograd.no_grad() + def add_grad(self, param, align): + """ + Add a new parameter gradient to the InternalStorage. Param.grad becomes a view of this InternalStorage buffer. + """ + + assert id( + param + ) not in self._param_ids, "The same gradients cannot be checked in twice" + + self._add_grad_as_view(param, align) + self._params.append(param) + self._param_ids.append(id(param)) + + @paddle.autograd.no_grad() + def manumal_relase(self): + """ + Release the buffer from InternalStorage. The InternalStorage will need to be rebuilt before use. + """ + if not self._release: + for p in self._params: + if p.grad is not None: + p.clear_gradient(False) + + self.buffer = None + self._fill = 0 + self.params_checked_in = 0 + self._release = True + + @paddle.autograd.no_grad() + def rebuild(self): + """ + Given the parameter gradients which have been registered previously, rebuild the whole InternalStorage. + """ + + if self._release: + self.buffer = paddle.zeros([self._max_size], dtype=self._dtype) + + for p in self._params: + self._add_grad_as_view(p, self._parm2align[p.name]) + + self._release = False + + @paddle.autograd.no_grad() + def _array_grads(self): + """ + Given the parameters gradients which have been registered previously, rebuild the whole InternalStorage. + """ + if len(self._params) > 0: + self._fill = 0 + for p in self._params: + self._add_grad_as_view(p, self._parm2align[p.name]) + + @paddle.autograd.no_grad() + def _add_grad_as_view(self, param, align): + assert param._numel( + ) > 0, "Cannot add a gradient to a released InternalStorage, please rebuild" + assert param.dtype == self.buffer.dtype + + grad_end = self._fill + param._numel() + offset = grad_end + align + assert offset <= self.buffer._numel() + + # Copy the current grad value to InternalStorage + with device_guard(self.dev_id, self._device): + tmp_var = self.buffer._slice(self._fill, grad_end) + tmp_var.get_tensor()._set_dims(param.shape) + param._copy_gradient_from(tmp_var) + del tmp_var + + self._fill = offset diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_utils.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_utils.py new file mode 100644 index 00000000000..eae8f87b014 --- /dev/null +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_utils.py @@ -0,0 +1,227 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import contextlib +from enum import Enum +import numpy as np +from types import MethodType + +import paddle +from paddle import _C_ops +from paddle.fluid import core +from paddle.fluid import layers +from paddle.fluid.dygraph import to_variable +from paddle.fluid.framework import dygraph_only + + +class Taskflow: + """ + Task flows, one way linked list for task acquisition. + """ + + def __init__(self, task, callback): + self.task = task + self.callback = callback + + +class Type(Enum): + """ + Type of trainable parameters + """ + fp16 = paddle.float16 + fp32 = paddle.float32 + + +class GroupShardedClipGrad: + def __init__(self, clip, device, group): + self._clip = clip + self._device = device + self._group = group + + @paddle.autograd.no_grad() + def _dygraph_clip(self, params_grads): + sum_square_fp32, sum_square_fp16 = [], [] + unslice_params_fp32, unslice_params_fp16 = [], [] + + for p, g in params_grads: + p_slice = True # using for slice parameter in sharding stage3 + if g is None or getattr(p, 'need_clip', True) is False: + continue + if hasattr(p, "unslice"): + p_slice = False + + merge_grad = g + if g.type == core.VarDesc.VarType.SELECTED_ROWS: + merge_grad = layers.get_tensor_from_selected_rows( + layers.merge_selected_rows(g)) + square = layers.square(merge_grad) + sum_square = layers.reduce_sum(square) + + if p.dtype == paddle.float16: + if p_slice: sum_square_fp16.append(sum_square) + else: unslice_params_fp16.append(sum_square) + elif p.dtype == paddle.float32: + if p_slice: sum_square_fp32.append(sum_square) + else: unslice_params_fp32.append(sum_square) + + # global norm of non-distributed FP16 params_and_grads + if len(sum_square_fp16) == 0: + global_norm_fp16 = paddle.to_tensor([0.], dtype=paddle.float32) + else: + global_norm_fp16 = layers.concat(sum_square_fp16) + global_norm_fp16 = layers.reduce_sum(global_norm_fp16) + global_norm_fp16 = paddle.cast( + global_norm_fp16, dtype=paddle.float32) + + # global norm of non-distributed FP16 params_and_grads for unslice parameters + if len(unslice_params_fp16) == 0: + global_unslice_fp16 = paddle.to_tensor([0.], dtype=paddle.float32) + else: + global_unslice_fp16 = layers.concat(unslice_params_fp16) + global_unslice_fp16 = layers.reduce_sum(global_unslice_fp16) + global_unslice_fp16 = paddle.cast( + global_unslice_fp16, dtype=paddle.float32) + + # global norm of non-distributed FP32 params_and_grads + global_norm_fp32 = layers.concat(sum_square_fp32) if len( + sum_square_fp32) != 0 else paddle.to_tensor( + [0.], dtype=paddle.float32) + global_norm_fp32 = layers.reduce_sum(global_norm_fp32) + + # global norm of non-distributed FP32 params_and_grads for unslice parameters + global_unslice_fp32 = layers.concat(unslice_params_fp32) if len( + unslice_params_fp32) != 0 else paddle.to_tensor( + [0.], dtype=paddle.float32) + global_unslice_fp32 = layers.reduce_sum(global_unslice_fp32) + global_unslice_var = global_unslice_fp16 + global_unslice_fp32 + + global_norm_var = global_norm_fp16 + global_norm_fp32 + 1.0 / self._group.nranks * global_unslice_var + + # add all reduce to get global norm of distributed params_and_grads + dev_id = int(self._device.split(":")[1]) + if paddle.device.get_device() == "cpu": + global_norm_var = global_norm_var.cuda(dev_id) + + with device_guard(dev_id, "gpu"): + paddle.distributed.all_reduce(global_norm_var, group=self._group) + + global_norm_var = layers.sqrt(global_norm_var) + max_global_norm = layers.fill_constant( + shape=[1], dtype=global_norm_var.dtype, value=self.clip_norm) + + clip_var = layers.elementwise_div( + x=max_global_norm, + y=layers.elementwise_max( + x=global_norm_var, y=max_global_norm)) + clip_var_fp16 = paddle.cast(clip_var, paddle.float16) + + for p, g in params_grads: + if getattr(p, 'need_clip', True) is False or g is None: + continue + origin_state = g.stop_gradient + g.stop_gradient = True + if p.dtype == paddle.float16: + g.scale_(clip_var_fp16.item()) + else: + g.scale_(clip_var.item()) + g.stop_gradient = origin_state + # p._reset_grad_inplace_version(True) + + return params_grads + + def __getattr__(self, item): + return getattr(self._clip, item) + + def __call__(self, params_grads): + return self._dygraph_clip(params_grads) + + +@contextlib.contextmanager +def device_guard(dev_id=0, device="cpu"): + origin_device = paddle.device.get_device() + if device == "cpu": + paddle.set_device(device) + elif device == "gpu": + paddle.set_device("gpu:{}".format(dev_id)) + try: + yield + finally: + paddle.set_device(origin_device) + + +@dygraph_only +def GroupShardedScaler(scaler): + def unscale_method(self, optimizer): + if not self._enable: + return + param_grads = [] + param_grads_fp16 = [] + param_grads_fp32 = [] + if hasattr(optimizer, "update_slice"): + optimizer.update_slice() + optimizer.update_scaler = True + + if getattr(optimizer._optim, '_param_groups', None) and isinstance( + optimizer._optim._param_groups[0], dict): + + for group in optimizer._optim._param_groups: + for param in group['params']: + if param.grad is not None: + param_grads.append(param.grad) + if param.grad.dtype in [ + core.VarDesc.VarType.FP16, paddle.float16 + ]: + param_grads_fp16.append(param.grad) + else: + param_grads_fp32.append(param.grad) + else: + for param in optimizer._optim._parameter_list: + if param.grad is not None: + param_grads.append(param.grad) + if param.grad.dtype in [ + core.VarDesc.VarType.FP16, paddle.float16 + ]: + param_grads_fp16.append(param.grad) + else: + param_grads_fp32.append(param.grad) + + temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool)) + temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool)) + + device = "cpu" if optimizer.offload else "gpu" + dev_id = 0 if device == "cpu" else int(paddle.get_device().split(":")[ + 1]) + + with device_guard(dev_id, device): + if len(param_grads_fp16): + _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale, + param_grads_fp16, + temp_found_inf_fp16) + if len(param_grads_fp32): + _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale, + param_grads_fp32, + temp_found_inf_fp32) + + self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0 + is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32") + + paddle.distributed.all_reduce( + is_found_inf, + op=paddle.distributed.ReduceOp.MAX, + group=optimizer._group) + self._found_inf = is_found_inf.numpy()[0] + + scaler._unscale = MethodType(unscale_method, scaler) + return scaler diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py index c6f05023e61..b09d256d9bb 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py @@ -11,9 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -#Taken and modified for fairscale from: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py -#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. import os import contextlib diff --git a/python/paddle/distributed/fleet/utils/internal_storage.py b/python/paddle/distributed/fleet/utils/internal_storage.py index 469da223667..80d8d8562d4 100644 --- a/python/paddle/distributed/fleet/utils/internal_storage.py +++ b/python/paddle/distributed/fleet/utils/internal_storage.py @@ -11,9 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -#Taken and modified for fairscale from: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py -#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e + +# The file has been adapted from fairscale file: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py +# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e +# We retain the following license from the original files: + +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. import os import time diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 6fd4caa7b4a..4c22028b230 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -20,11 +20,20 @@ import paddle from paddle.optimizer import Optimizer from paddle.distributed.utils import get_logger +from paddle.fluid.framework import in_dygraph_mode + +# Old version from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3 from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler +# New version +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler + logger_ = get_logger(logging.INFO) @@ -110,30 +119,56 @@ def group_sharded_parallel(model, logger_.info("*" * 30) logger_.info("Sharded level os uses sharded level os_g achieved now.") logger_.info("*" * 30) - optimizer = ShardingOptimizerStage2( - params=model.parameters(), - optim=optimizer, - group=group, - offload=offload) - model = ShardingStage2( - model, - optimizer, - group=group, - sync_buffers=sync_buffers, - buffer_max_size=buffer_max_size) + if in_dygraph_mode(): + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, + optim=optimizer, + group=group, + offload=offload) + model = GroupShardedStage2( + model, + optimizer, + group=group, + sync_buffers=sync_buffers, + buffer_max_size=buffer_max_size) + else: + optimizer = ShardingOptimizerStage2( + params=model.parameters(), + optim=optimizer, + group=group, + offload=offload) + model = ShardingStage2( + model, + optimizer, + group=group, + sync_buffers=sync_buffers, + buffer_max_size=buffer_max_size) elif level == 'p_g_os': - model = ShardingStage3( - model, - optimizer=optimizer, - group=group, - sync_buffers=sync_buffers, - segment_size=segment_size, - offload=offload, - sync_comm=sync_comm) + if in_dygraph_mode(): + model = GroupShardedStage3( + model, + optimizer=optimizer, + group=group, + sync_buffers=sync_buffers, + segment_size=segment_size, + offload=offload, + sync_comm=sync_comm) + else: + model = ShardingStage3( + model, + optimizer=optimizer, + group=group, + sync_buffers=sync_buffers, + segment_size=segment_size, + offload=offload, + sync_comm=sync_comm) else: raise ValueError("Please enter the correct level.") if params_fp16 and isinstance(scaler, paddle.amp.GradScaler): - scaler = ShardingScaler(scaler) + if in_dygraph_mode(): + scaler = GroupShardedScaler(scaler) + else: + scaler = ShardingScaler(scaler) logger_.info("*" * 30) logger_.info( "If there is a communication hang using group sharded, please check whether the communication operations of each process are unified." @@ -195,9 +230,9 @@ def save_group_sharded_model(model, output, optimizer=None): ), "Saving directory ({}) should be a directory, not a file".format(output) os.makedirs(output, exist_ok=True) output_model = os.path.join(output, "model.pdmodel") - if isinstance(model, ShardingStage2): + if isinstance(model, (ShardingStage2, GroupShardedStage2)): paddle.save(model._layer.state_dict(), output_model) - elif isinstance(model, ShardingStage3): + elif isinstance(model, (ShardingStage3, GroupShardedStage3)): convert2cpu = True if model._offload else False model.get_all_parameters(convert2cpu=convert2cpu) paddle.save(model._layer.state_dict(), output_model) diff --git a/python/paddle/fluid/dygraph/varbase_patch_methods.py b/python/paddle/fluid/dygraph/varbase_patch_methods.py index 9bf245ff388..b2441e90fc9 100644 --- a/python/paddle/fluid/dygraph/varbase_patch_methods.py +++ b/python/paddle/fluid/dygraph/varbase_patch_methods.py @@ -819,6 +819,10 @@ def monkey_patch_varbase(): def _numel(self): return self.get_tensor()._numel() + @framework.dygraph_only + def _clear_data(self): + self.get_tensor()._clear() + @framework.dygraph_only def _uva(self, device_id=0): ''' @@ -934,6 +938,7 @@ def monkey_patch_varbase(): setattr(core.eager.Tensor, "_slice", _slice) setattr(core.eager.Tensor, "_numel", _numel) setattr(core.eager.Tensor, "_uva", _uva) + setattr(core.eager.Tensor, "_clear_data", _clear_data) else: setattr(core.VarBase, "__name__", "Tensor") setattr(core.VarBase, "grad", grad) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 61f2bede2b3..0ec6bad8629 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1142,7 +1142,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_sharding_optimizer_stage2 PROPERTIES TIMEOUT 120) set_tests_properties(test_dygraph_sharding_stage2 PROPERTIES TIMEOUT 120) - set_tests_properties(test_dygraph_sharding_stage3 PROPERTIES TIMEOUT 120) + set_tests_properties(test_dygraph_sharding_stage3 PROPERTIES TIMEOUT 200) set_tests_properties(test_dygraph_group_sharded_api PROPERTIES TIMEOUT 120) set_tests_properties(test_auto_parallel_parallelizer PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_mp_layers PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api.py index d4832782c32..574a222ba18 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api.py +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model epoch = 10 @@ -144,4 +145,6 @@ def test_sharding_api(): if __name__ == '__main__': + with _test_eager_guard(): + pass test_sharding_api() diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api_eager.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api_eager.py new file mode 100644 index 00000000000..85a5446cb64 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_api_eager.py @@ -0,0 +1,147 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import shutil +import tempfile +import numpy as np + +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard +from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model + +epoch = 10 +paddle.seed(2022) +np.random.seed(2022) +base_lr = 0.1 +momentum_rate = 0.9 +l2_decay = 1e-4 +batch_size = 100 + + +class MLP(fluid.Layer): + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + return y + + +def reader_decorator(linear_size=1000): + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.Momentum( + parameters=[{ + "params": list(model.parameters()) + }] if opt_group else list(model.parameters()), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, shard_level, use_pure_fp16, output_dir): + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32') + scaler = paddle.amp.GradScaler(init_loss_scaling=32768) + + model, optimizer, scaler = group_sharded_parallel( + model=model, optimizer=optimizer, level=shard_level, scaler=scaler) + + train_reader = paddle.batch( + reader_decorator(), batch_size=batch_size, drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator( + capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + for eop in range(epoch): + model.train() + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + with paddle.amp.auto_cast(True, level='O2'): + out = model(img) + loss = paddle.nn.functional.cross_entropy( + input=out, label=label) + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + + if not use_pure_fp16: + avg_loss.backward() + optimizer.step() + else: + scaler.scale(avg_loss).backward() + scaler.step(optimizer) + scaler.update() + + optimizer.clear_grad() + + save_group_sharded_model(model, output=output_dir, optimizer=optimizer) + return model.parameters() + + +def test_sharding_api(): + paddle.distributed.init_parallel_env() + mlp, mlp1, mlp2 = MLP(), MLP(), MLP() + state_dict = mlp.state_dict() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + + output_dir = tempfile.mkdtemp() + + # fp16 + stage2_params = train_mlp( + mlp1, shard_level="os_g", use_pure_fp16=True, output_dir=output_dir) + stage3_params = train_mlp( + mlp2, shard_level="p_g_os", use_pure_fp16=True, output_dir=output_dir) + + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage2_params[i].numpy(), + stage3_params[i].numpy(), + rtol=1e-4, + atol=1e-3) + shutil.rmtree(output_dir) + + +if __name__ == '__main__': + with _test_eager_guard(): + test_sharding_api() diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2.py new file mode 100644 index 00000000000..8c07734d513 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2.py @@ -0,0 +1,229 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import numpy as np +import argparse +import tempfile +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2 + +seed = 2022 +epoch = 2 +linear_size = 1000 + +np.random.seed(seed) +paddle.seed(seed) + + +class MLP(fluid.Layer): + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + return y + + +def reader_decorator(linear_size=1000): + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.AdamW( + parameters=[{ + "params": model.parameters(), + }] if opt_group else model.parameters(), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, + sharding_stage, + batch_size=100, + use_pure_fp16=False, + accumulate_grad=False, + opt_group=False, + save_model=False, + test_minimize=False): + if sharding_stage != "dp": + group = paddle.distributed.new_group([0, 1], backend="nccl") + if opt_group: + optimizer = optimizer_setting( + model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group) + else: + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + + if sharding_stage == 2: + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, optim=optimizer, group=group) + + model = GroupShardedStage2( + model, optimizer, group=group, buffer_max_size=2**21) + else: + model = paddle.DataParallel(model) + + # check optimizer.minimize() error + if test_minimize: + try: + optimizer.minimize() + except: + print( + "====== Find sharding_stage2_optimizer.minimize() error ======") + return + + train_reader = paddle.batch( + reader_decorator(), batch_size=batch_size, drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator( + capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + if sharding_stage == 2: + model.to(device="gpu") + + for eop in range(epoch): + model.train() + + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + + out = model(img) + loss = paddle.nn.functional.cross_entropy(input=out, label=label) + + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + if batch_size == 20: + avg_loss = avg_loss / 5 + avg_loss.backward() + + if not accumulate_grad: + optimizer.step() + optimizer.clear_grad() + + if accumulate_grad: + optimizer.step() + optimizer.clear_grad() + + if save_model: + return model, optimizer + return model.parameters() + + +def test_dp_stage2(): + paddle.distributed.init_parallel_env() + mlp = MLP() + state_dict = mlp.state_dict() + mlp1 = MLP() + mlp2 = MLP() + mlp3 = MLP() + mlp4 = MLP() + mlp5 = MLP() + mlp6 = MLP() + mlp7 = MLP() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + mlp3.set_state_dict(state_dict) + mlp4.set_state_dict(state_dict) + mlp5.set_state_dict(state_dict) + mlp6.set_state_dict(state_dict) + mlp7.set_state_dict(state_dict) + + # DP VS stage2 + dp_params = train_mlp( + mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False) + stage2_params = train_mlp( + mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False) + for i in range(len(dp_params)): + np.testing.assert_allclose( + dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6) + + # stage2 accumulate grad + stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True) + stage2_accumulate_grad = train_mlp( + mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True) + for i in range(len(stage2_params)): + np.testing.assert_allclose( + stage2_params[i].numpy(), + stage2_accumulate_grad[i].numpy(), + rtol=1e-5, + atol=1e-5) + + # stage2 param list VS param group + stage2_params = train_mlp( + mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True) + for i in range(len(dp_params)): + np.testing.assert_allclose( + dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6) + + # save/load model + output_dir = tempfile.mkdtemp() + model_file = os.path.join(output_dir, "model.pdmodel") + optimizer_file = os.path.join(output_dir, "model.pdopt") + model_stage2, optimizer_stage2 = train_mlp( + mlp6, + sharding_stage=2, + use_pure_fp16=False, + opt_group=False, + save_model=True) + paddle.save(model_stage2.state_dict(), model_file) + paddle.save(optimizer_stage2.state_dict(), optimizer_file) + m_state_dict = paddle.load(model_file) + opt_state_dict = paddle.load(optimizer_file) + model_stage2.set_state_dict(m_state_dict) + optimizer_stage2.set_state_dict(opt_state_dict) + shutil.rmtree(output_dir) + + # check optimizer.minimize() error + train_mlp(mlp7, sharding_stage=2, test_minimize=True) + return + + +if __name__ == '__main__': + with _test_eager_guard(): + test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2_offload.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2_offload.py new file mode 100644 index 00000000000..b09314ae9e3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage2_offload.py @@ -0,0 +1,112 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler + +from dygraph_group_sharded_stage2 import MLP, reader_decorator, optimizer_setting + +seed = 2021 +epoch = 2 +batch_size = 32 +linear_size = 1000 + +np.random.seed(seed) +paddle.seed(seed) + + +def train_mlp(model, offload=False): + optimizer = optimizer_setting(model=model, use_pure_fp16=True) + + model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32') + scaler = paddle.amp.GradScaler(init_loss_scaling=1024) + scaler = GroupShardedScaler(scaler) + + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, optim=optimizer, offload=offload) + model = GroupShardedStage2(model, optimizer, buffer_max_size=2**21) + + train_reader = paddle.batch( + reader_decorator(linear_size), batch_size=batch_size, drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator( + capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + for eop in range(epoch): + model.train() + + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + + with paddle.amp.auto_cast(True, level='O2'): + out = model(img) + loss = paddle.nn.functional.cross_entropy( + input=out, label=label) + + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + scaler.scale(avg_loss).backward() + + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + + for dtype in optimizer.param_storages: + for dst_rank, param_storage in optimizer.param_storages[dtype].items(): + param_storage.to(device="gpu", dtype=dtype) + + return model.parameters() + + +def test_sharding_stage2_offload(): + paddle.distributed.init_parallel_env() + mlp = MLP(linear_size) + mlp_offload = MLP(linear_size) + mlp_offload.set_state_dict(mlp.state_dict()) + + mlp_params = train_mlp(mlp, offload=False) + mlp_offload_params = train_mlp(mlp_offload, offload=True) + + for i in range(len(mlp_params)): + np.testing.assert_allclose( + mlp_params[i].numpy(), + mlp_offload_params[i].numpy(), + rtol=5e-3, + atol=5e-3) + return + + +if __name__ == '__main__': + with _test_eager_guard(): + test_sharding_stage2_offload() diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3.py new file mode 100644 index 00000000000..6c350e63f44 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3.py @@ -0,0 +1,283 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import tempfile +import numpy as np +import argparse +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler + +epoch = 10 +paddle.seed(2022) +np.random.seed(2022) +base_lr = 0.1 +momentum_rate = 0.9 +l2_decay = 1e-4 + + +class MLP(fluid.Layer): + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + return y + + +def reader_decorator(linear_size=1000): + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.Momentum( + parameters=[{ + "params": list(model.parameters()) + }] if opt_group else list(model.parameters()), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, + sharding_stage, + use_pure_fp16=False, + accumulate_grad=False, + batch_size=100, + opt_group=False, + sync_comm=False, + test_minimize=False, + save_model=False): + group = paddle.distributed.new_group([0, 1]) + if opt_group: + optimizer = optimizer_setting( + model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group) + else: + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + + if use_pure_fp16: + model = paddle.amp.decorate( + models=model, level='O2', save_dtype='float32') + scaler = paddle.amp.GradScaler(init_loss_scaling=32768) + scaler = GroupShardedScaler(scaler) + if sharding_stage == 2: + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, optim=optimizer, group=group) + model = GroupShardedStage2( + model, optimizer, group=group, buffer_max_size=2**21) + elif sharding_stage == 3: + model = GroupShardedStage3( + model, + optimizer=optimizer, + group=group, + sync_comm=sync_comm, + segment_size=2**15) + + # check optimizer.minimize() error + if test_minimize: + try: + optimizer.minimize() + except: + print( + "====== Find sharding_stage3_optimizer.minimize() error ======") + return + + train_reader = paddle.batch( + reader_decorator(), batch_size=batch_size, drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator( + capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + for eop in range(epoch): + model.train() + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + with paddle.amp.auto_cast(True, level='O2'): + out = model(img) + loss = paddle.nn.functional.cross_entropy( + input=out, label=label) + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + + if batch_size == 20: + avg_loss = avg_loss / 5 + + if not use_pure_fp16: + avg_loss.backward() + else: + scaler.scale(avg_loss).backward() + + if not accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if sharding_stage == 3: + model.get_all_parameters() + + if save_model: + return model, optimizer + return model.parameters() + + +def test_stage2_stage3(): + paddle.distributed.init_parallel_env() + mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = MLP( + ), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP() + state_dict = mlp.state_dict() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + mlp3.set_state_dict(state_dict) + mlp4.set_state_dict(state_dict) + mlp5.set_state_dict(state_dict) + mlp6.set_state_dict(state_dict) + mlp7.set_state_dict(state_dict) + mlp8.set_state_dict(state_dict) + mlp9.set_state_dict(state_dict) + mlp10.set_state_dict(state_dict) + + # fp32 + stage2_params = train_mlp( + mlp1, sharding_stage=2, use_pure_fp16=False, opt_group=False) + stage3_params = train_mlp( + mlp2, sharding_stage=3, use_pure_fp16=False, opt_group=False) + + for i in range(len(stage2_params)): + np.testing.assert_allclose( + stage2_params[i].numpy(), + stage3_params[i].numpy(), + rtol=1e-6, + atol=1e-6) + + # fp32 accumulate grad + stage3_params = train_mlp( + mlp3, + sharding_stage=3, + use_pure_fp16=False, + accumulate_grad=True, + opt_group=True) + stage3_params_add = train_mlp( + mlp4, + sharding_stage=3, + use_pure_fp16=False, + accumulate_grad=True, + batch_size=20, + opt_group=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage3_params[i].numpy(), + stage3_params_add[i].numpy(), + rtol=1e-6, + atol=1e-4) + + # fp16 + stage2_params = train_mlp( + mlp5, sharding_stage=2, use_pure_fp16=True, opt_group=False) + stage3_params = train_mlp( + mlp6, sharding_stage=3, use_pure_fp16=True, opt_group=False) + for i in range(len(stage2_params)): + np.testing.assert_allclose( + stage2_params[i].numpy(), + stage3_params[i].numpy(), + rtol=1e-4, + atol=1e-3) + + # fp16 sync_comm + stage3_params = train_mlp( + mlp7, sharding_stage=3, use_pure_fp16=True, opt_group=False) + stage3_params_re = train_mlp( + mlp8, + sharding_stage=3, + use_pure_fp16=True, + opt_group=False, + sync_comm=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage3_params[i].numpy(), stage3_params_re[i].numpy(), rtol=1e-6) + + # save/load model + output_dir = tempfile.mkdtemp() + model_file = os.path.join(output_dir, "model.pdmodel") + optimizer_file = os.path.join(output_dir, "model.pdopt") + model_stage3, optimizer_stage3 = train_mlp( + mlp9, + sharding_stage=3, + use_pure_fp16=False, + opt_group=False, + save_model=True) + paddle.save(model_stage3.state_dict(), model_file) + paddle.save(optimizer_stage3.state_dict(), optimizer_file) + m_state_dict = paddle.load(model_file) + opt_state_dict = paddle.load(optimizer_file) + model_stage3.set_state_dict(m_state_dict) + optimizer_stage3.set_state_dict(opt_state_dict) + shutil.rmtree(output_dir) + + # check optimizer.minimize() error + train_mlp( + mlp10, + sharding_stage=3, + use_pure_fp16=False, + opt_group=False, + test_minimize=True) + + +if __name__ == '__main__': + with _test_eager_guard(): + test_stage2_stage3() diff --git a/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3_offload.py b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3_offload.py new file mode 100644 index 00000000000..5f9ec5c6e70 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_group_sharded_stage3_offload.py @@ -0,0 +1,205 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler + +epoch = 10 +paddle.seed(2022) +np.random.seed(2022) +base_lr = 0.1 +momentum_rate = 0.9 +l2_decay = 1e-4 + + +class MLP(fluid.Layer): + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + return y + + +def reader_decorator(linear_size=1000): + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.AdamW( + parameters=[{ + "params": model.parameters() + }] if opt_group else model.parameters(), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, + use_pure_fp16=False, + accumulate_grad=False, + offload=False, + batch_size=100, + convert2cpu=False): + group = paddle.distributed.new_group([0, 1]) + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + + if use_pure_fp16: + model = paddle.amp.decorate( + models=model, level='O2', save_dtype='float32') + scaler = paddle.amp.GradScaler(init_loss_scaling=32768) + scaler = GroupShardedScaler(scaler) + + model = GroupShardedStage3( + model, + optimizer=optimizer, + group=group, + offload=offload, + segment_size=2**15) + + train_reader = paddle.batch( + reader_decorator(), batch_size=batch_size, drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator( + capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + for eop in range(epoch): + model.train() + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + with paddle.amp.auto_cast(True, level='O2'): + out = model(img) + loss = paddle.nn.functional.cross_entropy( + input=out, label=label) + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + + if accumulate_grad: + avg_loss = avg_loss / 5 + + if not use_pure_fp16: + avg_loss.backward() + else: + scaler.scale(avg_loss).backward() + + if not accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if not convert2cpu: + model.get_all_parameters() + else: + model.get_all_parameters(convert2cpu) + return model.parameters() + + +def test_stage3_offload(): + paddle.distributed.init_parallel_env() + mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6 = MLP(), MLP(), MLP(), MLP(), MLP( + ), MLP(), MLP() + state_dict = mlp.state_dict() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + mlp3.set_state_dict(state_dict) + mlp4.set_state_dict(state_dict) + mlp5.set_state_dict(state_dict) + mlp6.set_state_dict(state_dict) + + # fp32 offload + stage3_params = train_mlp(mlp1, use_pure_fp16=False) + stage3_params_offload = train_mlp(mlp2, use_pure_fp16=False, offload=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage3_params[i].numpy(), + stage3_params_offload[i].numpy(), + rtol=1e-6, + atol=1e-8) + + # fp16 offload + stage3_params = train_mlp(mlp3, use_pure_fp16=True) + stage3_params_offload = train_mlp(mlp4, use_pure_fp16=True, offload=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage3_params[i].numpy(), + stage3_params_offload[i].numpy(), + rtol=1e-2, + atol=1e-2) + + # fp32 accumulate grad offload + stage3_params = train_mlp( + mlp5, use_pure_fp16=False, batch_size=20, accumulate_grad=True) + stage3_params_offload = train_mlp( + mlp6, + use_pure_fp16=False, + accumulate_grad=True, + offload=True, + batch_size=20, + convert2cpu=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose( + stage3_params[i].numpy(), + stage3_params_offload[i].numpy(), + rtol=1e-6, + atol=1e-8) + return + + +if __name__ == '__main__': + with _test_eager_guard(): + test_stage3_offload() diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py index 705831d50f1..0ed9b681fdc 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py @@ -23,6 +23,7 @@ import paddle import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.fleet.utils.internal_storage import GradStorage from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 @@ -138,4 +139,6 @@ def train_mlp(): if __name__ == '__main__': + with _test_eager_guard(): + pass train_mlp() diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2.py index fb01fd46c0d..82edd1c17a5 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2.py @@ -26,6 +26,7 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 @@ -222,4 +223,6 @@ def test_dp_stage2(): if __name__ == '__main__': + with _test_eager_guard(): + pass test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py index 39ba44815d9..a7b16bbb759 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage2_offload.py @@ -23,6 +23,7 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 @@ -106,4 +107,6 @@ def test_sharding_stage2_offload(): if __name__ == '__main__': + with _test_eager_guard(): + pass test_sharding_stage2_offload() diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3.py index 82821cd7ee6..cdb1de020f5 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3.py @@ -26,6 +26,7 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 @@ -274,4 +275,6 @@ def test_stage2_stage3(): if __name__ == '__main__': + with _test_eager_guard(): + pass test_stage2_stage3() diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3_offload.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3_offload.py index df7ba78d345..2cb327a29a3 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3_offload.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_stage3_offload.py @@ -23,6 +23,7 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ShardingStage3 from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ShardingScaler @@ -196,4 +197,6 @@ def test_stage3_offload(): if __name__ == '__main__': + with _test_eager_guard(): + pass test_stage3_offload() diff --git a/python/paddle/fluid/tests/unittests/test_dygraph_group_sharded_api.py b/python/paddle/fluid/tests/unittests/test_dygraph_group_sharded_api.py index 7c296c7e40e..e664face048 100644 --- a/python/paddle/fluid/tests/unittests/test_dygraph_group_sharded_api.py +++ b/python/paddle/fluid/tests/unittests/test_dygraph_group_sharded_api.py @@ -25,6 +25,7 @@ class TestDygraphGroupSharded(TestMultipleGpus): # check group sharded logic as well as the accuracy with single mode def test_dygraph_group_sharded(self): self.run_mnist_2gpu('dygraph_group_sharded_api.py') + self.run_mnist_2gpu('dygraph_group_sharded_api_eager.py') if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage2.py index 669ab7d8f7f..b7a5f9c9701 100644 --- a/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage2.py +++ b/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage2.py @@ -24,9 +24,11 @@ class TestDygraphShardingStage2(TestMultipleGpus): # check sharding logic as well as the accuracy with single mode def test_dygraph_sharding_stage2(self): + self.run_mnist_2gpu('dygraph_group_sharded_stage2.py') self.run_mnist_2gpu('dygraph_sharding_stage2.py') def test_dygraph_sharding_stage2_offload(self): + self.run_mnist_2gpu('dygraph_group_sharded_stage2_offload.py') self.run_mnist_2gpu('dygraph_sharding_stage2_offload.py') diff --git a/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage3.py b/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage3.py index c7da5d1e941..f69b52cae52 100644 --- a/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage3.py +++ b/python/paddle/fluid/tests/unittests/test_dygraph_sharding_stage3.py @@ -24,9 +24,11 @@ class TestDygraphShardingStage3(TestMultipleGpus): # check sharding logic as well as the accuracy with single mode def test_dygraph_sharding_stage3(self): + self.run_mnist_2gpu('dygraph_group_sharded_stage3.py') self.run_mnist_2gpu('dygraph_sharding_stage3.py') def test_dygraph_sharding_stage3_offload(self): + self.run_mnist_2gpu('dygraph_group_sharded_stage3_offload.py') self.run_mnist_2gpu('dygraph_sharding_stage3_offload.py') diff --git a/python/paddle/fluid/tests/unittests/test_egr_python_api.py b/python/paddle/fluid/tests/unittests/test_egr_python_api.py index a203a447902..600a49b2332 100644 --- a/python/paddle/fluid/tests/unittests/test_egr_python_api.py +++ b/python/paddle/fluid/tests/unittests/test_egr_python_api.py @@ -677,7 +677,7 @@ class EagerVariablePropertiesAndMethodsTestCase(unittest.TestCase): tensor2 = None tensor = paddle.to_tensor(arr, core.VarDesc.VarType.FP32, core.CPUPlace()) - tensor3 = core.eager.Tensor() + tensor3 = core.eager.Tensor(value=tensor, place=core.CPUPlace()) if core.is_compiled_with_cuda(): tensor2 = paddle.to_tensor(arr2, core.VarDesc.VarType.FP32, core.CUDAPlace(0)) -- GitLab