diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py deleted file mode 100644 index 32d8aa9b2279eb3c895dd845f9d41191ad76c13e..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py +++ /dev/null @@ -1,468 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The file has been adapted from fairscale file: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py -# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e -# We retain the following license from the original files: - -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -# -# This source code is licensed under the BSD license found in the -# LICENSE file in the root directory of this source tree. - -import logging -from collections import OrderedDict - -import numpy as np - -import paddle -import paddle.distributed as dist -from paddle.distributed.collective import _get_global_group, new_group -from paddle.fluid.clip import ClipGradByGlobalNorm -from paddle.framework import core -from paddle.optimizer import Optimizer - -from ...meta_parallel.sharding.sharding_utils import ( - ShardingClipGrad, - Type, - device_guard, -) -from ...utils.internal_storage import GradStorage, ParamStorage - -# CUDA alignment 256 bytes, cpu alignment 4096 bytes -alignment = {"gpu": 256, "cpu": 4096} -align = { - Type.fp16.value: 2, - Type.fp32.value: 4, -} - - -class ShardingOptimizerStage2(Optimizer): - """ - A wrapper for Sharding Stage2 Optimizer in Dygraph. - - .. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer. - - .. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf. - - """ - - # TODO (Baibaifan) - # Feature Notes: - # 1. Unified memory for parameters and parameters.grad to InternalStorage. - # 2. Support the segmentation of optimizer parameters and partial updating of parameters. - # 3. Dynamically adjust training parameters and models. - # 4. Support offload function. - # 5. Support the establishment of independent communication groups. - # 6. Broadcast_fp16 is not supported now. - def __init__( - self, - params, - optim, - group=None, - offload=False, - device="gpu", - pertrain_sync_models=True, - **kw - ): - - super().__init__(optim._learning_rate, params, kw) - - # Segmentation information - self._dtype_rank_params = ( - OrderedDict() - ) # {dtype:[param1,param2]} device, rank, params - self._param2rank = {} - self.__segment_params = [] - self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}} - self._param2align = {} # {param.name: align} - - # Default information - self._optim_defaults = kw - self._optim = optim - - assert hasattr( - self._optim, "_master_weights" - ), "Must use optimizer with _master_weights attribute" - self._local_params = params - self._default_device = device - self._pfp16 = ( - len( - list( - filter( - lambda x: x.trainable and x.dtype == Type.fp16.value, - self._local_params, - ) - ) - ) - > 0 - ) - - self.group = ( - new_group(_get_global_group().ranks) if group is None else group - ) - - self.world_size = self.group.nranks - self.rank = self.group.rank - self._global_root_rank = self.group.ranks[0] - - # Synchronous all ranks models - if pertrain_sync_models: - self._sync_params_and_buffers() - - self.param_storages = {} # {dtype: {rank: InternalStorage}} - - if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm): - logging.warning( - "While using ClipGradByGlobalNorm in ShardingOptimizer, the grad clip of original optimizer will be changed." - ) - self._optim._grad_clip = ShardingClipGrad( - self._optim._grad_clip, paddle.get_device(), self.group - ) - if self._optim._parameter_list and isinstance( - self._optim._parameter_list[0], dict - ): - for item in self._optim._param_groups: - if "grad_clip" in item.keys(): - item["grad_clip"] = ShardingClipGrad( - self._optim._grad_clip, - paddle.get_device(), - self.group, - ) - - if offload: - assert ( - self._pfp16 - ), "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16" - - self.offload = offload # Using for offload - self.offload_device = "cpu" - self.offload_buffer_size = 0 - self.offload_param2align = {} - self.offload_params = None - self.offload_grads = None - - self._master_params = {} - - # Update optimizer parameters and adjust parameter storage and use according to rank. - self._update_opt_status() - - @paddle.autograd.no_grad() - def _sync_params_and_buffers(self): - """ - Sync all model states for all ranks - """ - - for p in self._local_params: - dist.broadcast( - p, src=self._global_root_rank, group=self.group, sync_op=True - ) - - # Multi stream operation will be supported later - dist.wait(tensor=p, group=self.group, use_calc_stream=True) - - def _generate_master_params(self, trainable_params): - if self.offload: - for param in trainable_params: - if param.name not in self._master_params.keys(): - self._master_params[param.name] = core.VarBase( - name=param.name, - value=param.cast(dtype=Type.fp32.value).numpy(), - place=core.CPUPlace(), - stop_gradient=param.stop_gradient, - ) - else: - for param in trainable_params: - if param.dtype == Type.fp16.value: - self._optim._master_weights[param.name] = paddle.cast( - param, Type.fp32.value - ) - - def _update_opt_status(self): - """Update optimizer status and parameter storage information, and special functions to be developed.""" - # func 1 - self._integration_params() - - # fun 2 TODO - - # Segement helpers - - def _segment_params(self): - """ - Divide all optimizer parameters equally into rank. - """ - if len(self.__segment_params) == 0: - self.__segment_params, param_lists = [ - [] for _ in range(self.world_size) - ], [[] for _ in range(self.world_size)] - sizes = [0] * self.world_size - for param in self._local_params: - # Add this param to rank with smallest size. - rank = sizes.index(min(sizes)) - param_lists[rank].append(param) - - # Statistical real numels - sizes[rank] += np.prod(param.shape) if param.trainable else 0 - - for rank, params in enumerate(param_lists): - self.__segment_params[rank].extend(params) - return self.__segment_params - - @property - def local_params(self): - return self._local_params - - @property - def param2rank(self): - """Map the params to the rank which owns them""" - if len(self._param2rank) == 0: - for rank, params in enumerate(self._segment_params()): - for param in params: - self._param2rank[param.name] = rank - return self._param2rank - - @property - def dtype_rank_params(self): - """ - Divide the parameters into groups according to rank and dtype. - """ - if len(self._dtype_rank_params) == 0: - # Assign the parameters of each rank according to the type - for param in self._local_params: - if param.dtype not in self._dtype_rank_params.keys(): - self._dtype_rank_params[param.dtype] = [ - [] for _ in range(self.world_size) - ] - self._dtype_rank_params[param.dtype][ - self.param2rank[param.name] - ].append(param) - - # Sort per rank params by size - for dtype in self._dtype_rank_params.keys(): - for rank_params in self._dtype_rank_params[dtype]: - rank_params.sort(key=lambda x: np.prod(x.shape)) - - return self._dtype_rank_params - - @property - def rank_buffer_size(self): - """ - Count the memory size of the parameters corresponding to rank under the corresponding dtype. - """ - # CUDA alignment 256 bytes - if len(self._rank_buffer_size) == 0: - for dtype in self.dtype_rank_params.keys(): - if dtype not in self._rank_buffer_size.keys(): - self._rank_buffer_size[dtype] = {} - for dst_rank, per_rank_params in enumerate( - self.dtype_rank_params[dtype] - ): - if dst_rank not in self._rank_buffer_size[dtype].keys(): - self._rank_buffer_size[dtype][dst_rank] = 0 - for param in per_rank_params: - if not param.trainable: - continue - size = np.prod(param.shape) * align[dtype] - remaining = size % alignment[self._default_device] - ali = ( - 0 - if remaining == 0 - else alignment[self._default_device] - remaining - ) - align_ = ali // align[dtype] - self._rank_buffer_size[dtype][dst_rank] += ( - np.prod(param.shape) + align_ - ) - self._param2align[param.name] = align_ - - return self._rank_buffer_size - - def _integration_params(self): - """ - Integrate the parameters into a continuous memory according to rank, and support the update of training parameters. - """ - - for dtype, per_rank_params in self.dtype_rank_params.items(): - if dtype not in self.param_storages.keys(): - self.param_storages[dtype] = {} - - for dst_rank, params in enumerate(per_rank_params): - if len(params) > 0: - - # Merge all the trainable params in a single InternalStorage - trainable_params = list( - filter(lambda x: x.trainable, params) - ) - if self._pfp16 and dst_rank == self.rank: - self._generate_master_params(trainable_params) - if trainable_params: - param_storage = ParamStorage( - size=self.rank_buffer_size[dtype][dst_rank], - dtype=dtype, - device=self._default_device, - ) - - param_storage.add_rank_params( - trainable_params, self._param2align - ) - self.param_storages[dtype][dst_rank] = param_storage - - # Clear the InternalStorage keys which are not in use anymore - dtype_in_use = list(self.dtype_rank_params.keys()) - dtype_to_pop = list( - filter(lambda x: x not in dtype_in_use, self.param_storages.keys()) - ) - for d in dtype_to_pop: - self.param_storages.pop(d) - - if self.offload: - self._optim._master_weights = self._master_params - cpu_master_params = [p for p in self._master_params.values()] - for param in cpu_master_params: - size = np.prod(param.shape) * align[Type.fp32.value] - remaining = size % alignment[self.offload_device] - ali = ( - 0 - if remaining == 0 - else alignment[self.offload_device] - remaining - ) - align_ = ali // align[Type.fp32.value] - self.offload_buffer_size += np.prod(param.shape) + align_ - self.offload_param2align[param.name] = align_ - - if cpu_master_params: - with device_guard(self.rank, self.offload_device): - self.offload_params = ParamStorage( - size=self.offload_buffer_size, - dtype=Type.fp32.value, - device=self.offload_device, - ) - self.offload_params.add_rank_params( - cpu_master_params, self.offload_param2align, False - ) - self.offload_params.buffer.stop_gradient = False - - self.offload_grads = GradStorage( - size=self.offload_buffer_size, - dtype=Type.fp32.value, - device=self.offload_device, - destination=self.rank, - parm2align=self.offload_param2align, - convert_cpu=True, - ) - for p in cpu_master_params: - self.offload_grads.add_grad( - p, self.offload_param2align[p.name] - ) - - self._optim._master_weights[ - self.offload_params.buffer.name - ] = self.offload_params.buffer - - def _offload_acc_grad(self, param_name, grad_fp32_cpu): - """accumulate grads with offload strategy""" - with device_guard(self.rank, self.offload_device): - if param_name in self._master_params.keys(): - if self._master_params[param_name].grad is None: - self._master_params[param_name]._copy_gradient_from( - grad_fp32_cpu - ) - else: - self._master_params[param_name].grad.add_(grad_fp32_cpu) - - self.offload_params.buffer._copy_gradient_from( - self.offload_grads.buffer - ) - - def _offload_scale_grad(self, scale_size): - """scale grads with offload strategy""" - with device_guard(self.rank, self.offload_device): - self.offload_grads.buffer.scale_(scale=scale_size) - - def _offload_clear_grad(self): - """clear grads with offload strategy""" - with device_guard(self.rank, self.offload_device): - self.offload_grads.buffer.zero_() - - def step(self): - """ - A wrapper for Optimizer's step function to finish the update operation of the optimizer. - """ - - if self.offload: - params_list = [self.offload_params.buffer] - - # TODO(Baibaifan): Offload will support param_groups later - if not isinstance(self._optim._param_groups[0], dict): - self._optim._parameter_list = params_list - self._optim._param_groups = params_list - - # Run the optimizer of the current rank step - if self.offload: - with device_guard(device=self.offload_device): - self._optim.step() - - dev_id = int(paddle.get_device().split(":")[1]) - for param in self._local_params: - if param.name in self._master_params.keys(): - param.set_value( - self._master_params[param.name] - .cuda(dev_id) - .cast(dtype=param.dtype) - ) - else: - self._optim.step() - - # Synchronize all the updated shards in between the ranks - self._broadcast_params() - - def minimize(self): - raise RuntimeError( - "optimizer.minimize() not support now, please use optimizer.step()" - ) - - def set_state_dict(self, state_dict): - self._optim.set_state_dict(state_dict) - - def state_dict(self): - return self._optim.state_dict() - - def _clear_cache(self): - self.__segment_params.clear() - self._dtype_rank_params.clear() - self._param2rank.clear() - - @paddle.autograd.no_grad() - def _broadcast_params(self): - """Broadcast the parameters of the current rank to each rank""" - - assert self._default_device == "gpu", "Only supported gpu" - - # Exchange all the shards with the other ranks - for dtype_per_rank in self.param_storages.values(): - for dst_rank, internal_storage in dtype_per_rank.items(): - dist.broadcast( - tensor=internal_storage.buffer, - src=self.group.ranks[dst_rank], - group=self.group, - sync_op=True, - ) - - # Multi stream operation will be supported later - dist.wait( - tensor=internal_storage.buffer, - group=self.group, - use_calc_stream=True, - ) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py b/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py index 1d6bd8e1ee4e063b21cc5c1fab24d021db8f6f43..6f0ea85344b7e0c679730356928c8749cf71cd66 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py @@ -11,5 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .sharding_utils import Type, device_guard diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py deleted file mode 100644 index 5dd3ae96580d97064d37c227cb1b2c06ecb97e87..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py +++ /dev/null @@ -1,619 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The file has been adapted from fairscale file: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/data_parallel/sharded_ddp.py -# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e -# We retain the following license from the original files: - -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -# -# This source code is licensed under the BSD license found in the -# LICENSE file in the root directory of this source tree. - -import logging -from collections import deque -from functools import reduce -from itertools import chain -from types import MethodType - -import numpy as np - -import paddle -import paddle.distributed as dist -from paddle import nn -from paddle.distributed import collective as collective -from paddle.distributed.collective import _get_global_group - -from ...meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from ...utils.internal_storage import GradStorage -from .sharding_utils import Taskflow, Type - - -def _trainable(param): - return param.trainable - - -class ShardingStage2(nn.Layer): - """ - A wrapper for Sharding Stage2 Layer in Dygraph. - .. warning: ShardingStage2 encapsulates the layer strategy and integrates it into the nn.Layer. - .. ZeRO: https://arxiv.org/pdf/1910.02054.pdf. - """ - - # TODO (Baibaifan) - # Feature Notes:: - # 1. Unified memory for param and param.grad to InternalStorage. - # 2. Divide param.grad according to rank to centrally apply for and release GPU memory. - # 3. Dynamically adjust training parameters and models. - # 4. Support offload function. - # 5. Support the establishment of independent communication groups. - - def __init__( - self, - layer, - sharding_optimizer, - group=None, - sync_buffers=False, - buffer_max_size=2**23, # 8MB - auto_refresh_trainable=True, - device="gpu", - ): - super().__init__() - - # training options - self._layer = layer - self._sharding_optimizers = ( - [sharding_optimizer] - if not isinstance(sharding_optimizer, list) - else sharding_optimizer - ) - assert all( - list( - map( - lambda opt: isinstance(opt, ShardingOptimizerStage2), - self._sharding_optimizers, - ) - ) - ), "Please use ShardingOptimizerStage2 optimizer" - self._sync_buffers = sync_buffers - self._auto_refresh_trainable = auto_refresh_trainable - - # Communication related attributes - self._group = ( - collective.new_group(_get_global_group().ranks) - if group is None - else group - ) - self._world_size_scaling = 1.0 / self._group.nranks - assert ( - self._group.nranks > 1 - ), "Training must be distributed, ranks must be greater than 1" - self._rank = self._group.rank - self._global_root_rank = self._group.ranks[ - 0 - ] # picking rank 0 as the reference - self._default_device = device - - # Global statistical parameters - self._all_params = list( - chain(*[optim.local_params for optim in self._sharding_optimizers]) - ) - self._trainable_params = [] - self._grad_reduced = [] - self._trainable_param2rank = {} - self._trainable_param2align = {} - self._trainable_mask = list(map(_trainable, self._all_params)) - self._param_grads = [] - - # Set grad storage size & Display param sizes and model sizes - model_size = sum( - [np.prod(p.shape) for p in self._layer.parameters()] - ).item() - assert buffer_max_size >= 0, "buffer_max_size must be GE than 0." - self._buffer_max_size = self._rank_buffer_size( - buffer_max_size, model_size - ) - self._use_grad_storage = buffer_max_size > 0 - self._grad_storages = {} # {dtype: {rank: GradStorage}} - self._has_grad_storage = [] - self._grad_storage_list = [] - - # Offload - # TODO(haohongxiang): Now it's not be supported for multi-optimizers using Offload strategy - self._offload_optims = list( - filter(lambda optim: optim.offload, self._sharding_optimizers) - ) - if len(self._offload_optims) > 0: - assert ( - len(self._sharding_optimizers) == 1 - ), "Only support offload strategy for single optimizer" - - self._offload = self._sharding_optimizers[0].offload - self._offload_device = "cpu" - - # Set backward pass hooks - self._bw_hooks = [] - - # Set tasks flow - self._tasks_flow = deque() - - # Define optimizer step and clear_grad - self._redefine_opt_step() - self._redefine_opt_clear() - - def forward(self, *inputs, **kwargs): - """ - A wrapper for Sharding Stage2 layer. - - Fresh trainable params or rebuild grad storage - - Sync layer's buffer params - - Clear all flags states - - Forward for origin layers - """ - - # Whether to need to reset trainable parameters - needs_fresh = len(self._bw_hooks) == 0 and self.training - - if self._auto_refresh_trainable: - needs_fresh |= self._detect_train_change() - - # Front hook - self._init_internal_storage(needs_fresh) - - # Sync layer's buffers state - if self._sync_buffers: - self.__sync_buffers() - - # Normal FW on the base model - fw = self._layer(*inputs, **kwargs) - - return fw - - def set_state_dict(self, state_dict, use_structured_name=True): - self._layer.set_state_dict( - state_dict, use_structured_name=use_structured_name - ) - - def state_dict( - self, - destination=None, - include_sublayers=True, - structured_name_prefix="", - ): - return self._layer.state_dict( - destination=None, include_sublayers=True, structured_name_prefix="" - ) - - def _clear_gradients(self): - """ - Set zero to the gradient of the optimizer's current rank trainable parameters. - """ - # Release grad storages - for dtype in self._grad_storages.keys(): - if ( - not self._offload - and self._rank in self._grad_storages[dtype].keys() - ): - self._grad_storages[dtype][self._rank].buffer.zero_() - - # Release grads of params - for param in self._trainable_params: - if param.name in self._param_grads and param.grad is not None: - param.clear_gradient() - - # Release grads of master params with offload strategy - if self._offload: - self._sharding_optimizers[0]._offload_clear_grad() - - def _grad_scale(self): - """ - Before the gradient accumulation, scale the gradient. - """ - # Scale grad storages - for dtype in self._grad_storages.keys(): - if ( - not self._offload - and self._rank in self._grad_storages[dtype].keys() - ): - self._grad_storages[dtype][self._rank].buffer.scale_( - scale=self._world_size_scaling - ) - - # Scale grads of params - for param in self._trainable_params: - if param.name in self._param_grads and param.grad is not None: - param.grad.scale_(scale=self._world_size_scaling) - param._reset_grad_inplace_version(True) - - # Scale grads of master params with offload strategy - if self._offload: - self._sharding_optimizers[0]._offload_scale_grad( - self._world_size_scaling - ) - - def _init_internal_storage(self, needs_fresh): - """ - Judge Fresh trainable params or rebuild grad storage. - """ - if needs_fresh: - self._fresh_trainable() - else: - self._build_grad_storages() - - # Clear all flags state - self._clear_counters() - - def to(self, device=None, dtype=None, blocking=True): - """ - Synchronously or asynchronously convert the data type of the layer, the device is not supported now. - """ - assert isinstance(device, str), "Device must be type str" - assert ( - device == self._default_device - ), "New devices are not supported, because of the optimizer state is not sync" - - self._layer.to(device=device, dtype=dtype, blocking=blocking) - - # Re-build the buckets, hooks, etc.. - self._fresh_trainable() - - def _fresh_trainable(self): - """Whether to update training parameters.""" - - # Make sure that this is not done while gradients are waiting to be reduced (if no_sync context for instance) - if reduce(lambda x, y: x or y, self._grad_reduced, False): - logging.warning("Grads waiting to be reduced.") - - self._trainable_params = list( - filter(lambda x: x.trainable, self._all_params) - ) - self._trainable_params.sort(key=lambda x: np.prod(x.shape)) - - self._trainable_param2rank = {} - for optim in self._sharding_optimizers: - # Need to be wrappered for Sharding Stage2 Optimizer - if len(optim.param_storages.keys()) == 0: - optim.update_opt_status() - - # Get the parameters split by the optimizer according to rank - for ( - per_rank_params - ) in ( - optim.dtype_rank_params.values() - ): # all the params from all ranks - for params in per_rank_params: - for param in filter(lambda x: x.trainable, params): - self._trainable_param2rank[ - param.name - ] = optim.param2rank[param.name] - self._trainable_param2align[ - param.name - ] = optim._param2align[param.name] - - self._setup_use_grad_storage() - - # wait next func hook support - self._setup_backward_hooks() - - @paddle.autograd.no_grad() - def __sync_buffers(self): - """ - Sync all the param buffers from all ranks (exp: batch norm statistics). - """ - - for buffer in self._layer.buffers(include_sublayers=True): - dist.broadcast( - buffer, self._global_root_rank, self._group, sync_op=True - ) - # Multi stream operation will be supported later - dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) - - def __getattr__(self, name): - """Forward missing attributes to wrapped layer.""" - try: - return super().__getattr__(name) - except AttributeError: - return getattr(self._layer, name) - - @paddle.autograd.no_grad() - def _clear_counters(self): - """Reset all the grad reduce and call counters.""" - if self.training: - self._grad_reduced = [True for _ in self._trainable_params] - - if self._use_grad_storage: - for grad_storage in self._grad_storage_list: - grad_storage.reset_checked_in() - - def _get_reduce_fn(self, index, param, dst_rank): - """ - There are two ways to reduce gradient. - - 1. Do not use self._use_grad_storage or exceeded buffer_max_size will be reduced separately. - - 2. Use grad_storage Reduce the storage to get the full gradient from different ranks. - """ - - if not self._use_grad_storage or not self._has_grad_storage[index]: - # Direct reduction - @paddle.autograd.no_grad() - def reduce(*_): - # Skip gradient reduction, do not change status information - if self._grad_reduced[index]: - assert ( - param.grad is not None - ), "Parameter gradient cannot be None" - - # Change reduce information - self._grad_reduced[index] = False - - # Clear the gradient that does not belong to the current rank through the callback function - def cleanup(): - if dst_rank != self._rank: - param.clear_gradient(False) - elif self._offload: - self._sharding_optimizers[0]._offload_acc_grad( - param.name, - param.grad.cast(dtype=Type.fp32.value).cpu(), - ) - param.clear_gradient(False) - - # Synchronize the reduce parameter gradient - self._tasks_flow.append( - Taskflow( - task=dist.reduce( - tensor=param.grad, - dst=self._group.ranks[dst_rank], - group=self._group, - sync_op=True, - ), - callback=cleanup, - ) - ) - - # Multi stream operation will be supported later - dist.wait( - tensor=param.grad, - group=self._group, - use_calc_stream=True, - ) - - # Clear the task flow and trigger callback to clear the redundant gradient - self._clear_task_flow() - - else: - # Buffer reduction - @paddle.autograd.no_grad() - def reduce(*_): - # Skip gradient reduction, do not change status information - if self._grad_reduced[index]: - assert ( - param.grad is not None - ), "Parameter gradient cannot be None" - - # Change reduce information - self._grad_reduced[index] = False - grad_storage = self._grad_storages[param.dtype][dst_rank] - grad_storage.params_checked_in += 1 - - if grad_storage.all_checked_in: - assert grad_storage.buffer is not None - - # Clearing up the grad_storage buffer - def cleanup(): - if dst_rank != self._rank: - for p in grad_storage._params: - p.clear_gradient(False) - p._gradient_set_empty(False) - - grad_storage.buffer.value().get_tensor()._clear() - elif self._offload: - grad_storage.to(device=self._offload_device) - for p in grad_storage._params: - self._sharding_optimizers[ - 0 - ]._offload_acc_grad( - p.name, - p.grad.cast(dtype=Type.fp32.value), - ) - p.clear_gradient(False) - p._gradient_set_empty(False) - grad_storage._device = self._default_device - grad_storage.buffer.value().get_tensor()._clear() - - # Reduce the bucket - grad_storage.sent = True - self._tasks_flow.append( - Taskflow( - task=dist.reduce( - tensor=grad_storage.buffer, - dst=self._group.ranks[ - grad_storage.destination - ], - group=self._group, - sync_op=True, - ), - callback=cleanup, - ) - ) - - # Multi stream operation will be supported later - dist.wait( - tensor=grad_storage.buffer, - group=self._group, - use_calc_stream=True, - ) - - # Clear the task flow and trigger callback to clear the redundant gradient - self._clear_task_flow() - - return reduce - - def _setup_backward_hooks(self): - """ - Set the backward hook to synchronize the gradients of all rank by reduce group ranks. - """ - - # Remove previous backward hooks - while len(self._bw_hooks) > 0: - self._bw_hooks.pop().remove() - - # Go through the parameters, attach the hook - if not self.training: - return - - for index, param in enumerate(self._trainable_params): - dst_rank = self._trainable_param2rank[param.name] - - reduce_function = self._get_reduce_fn(index, param, dst_rank) - - self._bw_hooks.append( - param._register_backward_hook(reduce_function) - ) - - def _setup_use_grad_storage(self): - """ - Integrate the parameters gradient into a continuous memory according to rank, and support the update of training parameters. - """ - - # According to parameters's numel sort, allocate memory of parameter gradient to continuous memory according to rank - self._grad_storages = {} - self._has_grad_storage = [False for _ in self._trainable_params] - - for index, param in enumerate(self._trainable_params): - dst_rank = self._trainable_param2rank[param.name] - - if param.dtype not in self._grad_storages.keys(): - self._grad_storages[param.dtype] = {} - - if dst_rank not in self._grad_storages[param.dtype].keys(): - self._grad_storages[param.dtype][dst_rank] = GradStorage( - self._buffer_max_size[param.dtype], - dtype=param.dtype, - device=self._default_device, - destination=dst_rank, - parm2align=self._trainable_param2align, - ) - - # Criteria to decide whether this parameter is to be put in GradStorage - if self._grad_storages[param.dtype][dst_rank].can_add_grad_view( - param, self._trainable_param2align[param.name] - ): - self._grad_storages[param.dtype][dst_rank].add_grad( - param, self._trainable_param2align[param.name] - ) - self._has_grad_storage[index] = True - else: - self._param_grads.append(param.name) - print( - "Can not add param: {}, param's shape: {}, param align: {}, grad_storages fill: {}, ".format( - param.name, - param.shape, - self._trainable_param2align[param.name], - self._grad_storages[param.dtype][dst_rank]._fill, - ) - ) - - self._grad_storage_list = list( - chain( - *[ - self._grad_storages[dtype].values() - for dtype in self._grad_storages.keys() - ] - ) - ) - - def _clear_task_flow(self): - """Try to consume the previous tasks.""" - while len(self._tasks_flow) > 0: - task = self._tasks_flow.popleft() - if task.callback is not None: - task.callback() - - def _detect_train_change(self): - # Current trainable parameters - trainable_mask = list(map(_trainable, self._all_params)) - - # Whether parameters trainability changed - trainability_changed = trainable_mask != self._trainable_mask - - if trainability_changed: - logging.warning( - "Trainable params changed, because of eval/train mode or parameter freezing/unfreeze." - ) - self._trainable_mask = trainable_mask - - return trainability_changed - - def _build_grad_storages(self): - """ - Rebuild grad storages. - """ - # Rebuild fp16/fp32 grad storages - for dtype in self._grad_storages.keys(): - for dst_rank, grad_storage in self._grad_storages[dtype].items(): - if self._offload or dst_rank != self._rank: - grad_storage.manumal_relase() - grad_storage.rebuild() - - def _rank_buffer_size(self, buffer_max_size, model_size): - """ - Generate the minimum buffer size for each rank & Display param sizes and model sizes. - """ - - # Initialize buffer size - rank_buffer_size = {} - for shard_opt in self._sharding_optimizers: - if shard_opt.rank_buffer_size: - for dtype in shard_opt.rank_buffer_size.keys(): - sizes = max(shard_opt.rank_buffer_size[dtype].values()) - rank_buffer_size[dtype] = min(sizes, buffer_max_size) - - if Type.fp16.value in rank_buffer_size.keys(): - # FP16 GradStorage and model size - print( - "====== FP16 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".format( - rank_buffer_size[Type.fp16.value] / 2**19, - model_size / 2**19, - ) - ) - if Type.fp32.value in rank_buffer_size.keys(): - # FP32 GradStorage and model size - print( - "====== FP32 GradStorage size: {:.2f}M parameters, Model size {:.2f}M parameters ======".format( - rank_buffer_size[Type.fp32.value] / 2**18, - model_size / 2**18, - ) - ) - return rank_buffer_size - - def _redefine_opt_step(self): - grad_func = self._grad_scale - for opt in self._sharding_optimizers: - opt_step = opt.step - - def _opt_step(self): - grad_func() - opt_step() - - opt.step = MethodType(_opt_step, opt) - - def _redefine_opt_clear(self): - clear_func = self._clear_gradients - - def _opt_clear(self): - clear_func() - - for opt in self._sharding_optimizers: - opt.clear_grad = MethodType(_opt_clear, opt) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py deleted file mode 100644 index 28ab704fb19873906e0c57cd25c1398cbf4abf3c..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py +++ /dev/null @@ -1,1055 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from collections import OrderedDict -from types import MethodType - -import numpy as np - -import paddle -import paddle.distributed as dist -import paddle.fluid.core as core -from paddle import nn -from paddle.autograd import PyLayer -from paddle.distributed import collective -from paddle.distributed.collective import _get_global_group -from paddle.fluid.clip import ClipGradByGlobalNorm -from paddle.fluid.framework import ParamBase - -from ...utils.internal_storage import GradStorage -from ..pp_utils.utils import _all_gather -from .sharding_utils import ShardingClipGrad, Type, device_guard - -# CUDA alignment 256 bytes -alignment = { - "gpu": 256, -} -align = { - Type.fp16.value: 2, - Type.fp32.value: 4, -} - -global CHECK_LAYER -CHECK_LAYER = dict() # Help to check layer's id -> layer's name - - -class ShardingStage3(nn.Layer): - """ - A wrapper for Sharding Stage3 Layer in Dygraph. - - .. warning: ShardingStage3 encapsulates the layer strategy and integrates it into the nn.Layer. - - .. ZeRO: https://arxiv.org/pdf/1910.02054.pdf. - """ - - # TODO (Baibaifan) - # Feature Notes:: - # 1. The model supports the segmentation of parameters by global ranks in layers. - # 2. Support communication flow and computing flow. - # 3. Support offload function. - # 4. Support the establishment of independent communication groups. - - def __init__( - self, - layer, - optimizer, - group=None, - sync_buffers=False, - device="gpu", - segment_size=2**15, - pertrain_sync_models=True, - offload=False, - sync_comm=False, - ): - super().__init__() - - # Default configs - assert core.is_compiled_with_cuda(), "Only support CUDA." - self._layer = layer - self._default_device = device - self.__sync_buffers = sync_buffers - self._offload = offload - self._sync_comm = sync_comm - # segmentation size - assert segment_size >= 0, "segment_size must be GE than 0." - self._segment_size = segment_size - - global DEV - DEV = ( - "cpu" - if paddle.get_device() == "cpu" - else paddle.get_device().split(":")[0] - ) - global DEV_ID - DEV_ID = ( - 0 - if paddle.get_device() == "cpu" - else int(paddle.get_device().split(":")[1]) - ) - global param2dtype - param2dtype = dict() - - # Communication group establishment - self._group = ( - collective.new_group(_get_global_group().ranks) - if group is None - else group - ) - self._world_size_scaling = 1.0 / self._group.nranks - assert ( - self._group.nranks > 1 - ), "Training must be distributed, ranks must be greater than 1." - self._rank = self._group.rank - self._global_root_rank = self._group.ranks[ - 0 - ] # picking rank 0 as the reference - self._global_ranks = self._group.ranks - - # Parameter segmentation for global ranks - # After flatten -> self._param2buffer_size, self._param2buffer, self._trainable_params - self._param2buffer_size = dict() # {param.name: size} - self._param2buffer = ( - dict() - ) # {param.name: [(start0, end0),(start1, end1), ...]} - self._trainable_params = dict() # {id(layer): [trainable_params]} - self._unslice_params = set() # param's numel <= segment_size - self._unslice_params2align = dict() # {param.name: param's align} - self._grad_storages = dict() # {param.dtype: GradStorage} - - assert not isinstance( - optimizer, list - ), "Multiple optimizers are not supported now." - self._optim = _OptimizerWrapper( - optimizer, self._offload, self._group, self._update_params_slice - ) - self._ori_parameter_list = self._optim._parameter_list - self._ori_param_groups = self._optim._param_groups - - # Replace optimizer's _grad_clip - if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm): - logging.warning( - "While using ClipGradByGlobalNorm in ShardingStage3, the grad clip of original optimizer will be changed." - ) - self._optim._grad_clip = ShardingClipGrad( - self._optim._grad_clip, paddle.get_device(), self._group - ) - - # Synchronous all ranks models - if pertrain_sync_models: - self._sync_params_and_buffers() - - self._segment_rank_params(self._layer) - - # Add unslice params to master_weight in fp16 - self._handle_unslice_params() - - # In the first step, record the execution order of the layer - self._order_tracer = OrderedDict() - self._order_tracer["order"] = 0 - self._order_tracer["layer"] = list() - - # Register task flow - self._task_flow = TaskFlow() - - # Register forward hooks - self._register_forward_hooks(self._layer) - - # Register backward parameter hooks - self._register_backward_hooks() - - # Redefine optimizer step and clear function - self._redefine_opt_step() - self._redefine_opt_clear() - - @paddle.autograd.no_grad() - def _sync_params_and_buffers(self): - """ - Sync all model states for all ranks - """ - - for p in self._layer.parameters(): - dist.broadcast( - p, src=self._global_root_rank, group=self._group, sync_op=True - ) - - # Multi stream operation will be supported later - dist.wait(tensor=p, group=self._group, use_calc_stream=True) - - def _clear_gradients(self): - assert len(self._trainable_params.keys()) > 0 - current_layer_params = self._layer.parameters(include_sublayers=True) - # 1.Handle param's slice - trainable_params = list( - filter( - lambda p: p.trainable and p not in self._unslice_params, - current_layer_params, - ) - ) - for param in trainable_params: - assert hasattr( - param, "fw_storage" - ), "Find {} don't have fw_storage attribute.".format(param.name) - - param.fw_storage.clear_gradient(False) - param.fw_storage._gradient_set_empty(False) - param.bw_storage._clear() - param.bw_storage = None - # 2.Handle unslice param - if not self._offload: - for grad_storage in self._grad_storages.values(): - grad_storage.buffer.zero_() - else: - for param in list(self._unslice_params): - param.clear_gradient(False) - param._gradient_set_empty(False) - tmp_var = param.cuda(DEV_ID) - - if ( - tmp_var.dtype == Type.fp32.value - and param2dtype[param.name] == Type.fp16.value - ): - tmp_var = paddle.cast(tmp_var, Type.fp16.value) - tmp_var._share_buffer_to(param) - tmp_var._clear() - for grad_storage in self._grad_storages.values(): - grad_storage.manumal_relase() - grad_storage.rebuild() - - # Update param memery slice - def _update_params_slice(self): - update_list = self._update_params() - - if not isinstance(self._optim._param_groups[0], dict): - slice_params = [param.fw_storage for param in update_list] - self._optim._parameter_list = slice_params + list( - self._unslice_params - ) - self._optim._param_groups = slice_params + list( - self._unslice_params - ) - else: - for param_group in self._optim._param_groups: - p_group = [] - for p in param_group['params']: - if hasattr(p, "fw_storage"): - p_group.append(p.fw_storage) - else: - p_group.append(p) - - param_group['params'] = p_group - - def forward(self, *inputs, **kwargs): - """ - A wrapper for Sharding Stage3 layer. - """ - # 1.Sync layer's buffers state - if self.__sync_buffers: - self._sync_buffers() - - # 2.Normal FW on the base model - fw = self._layer(*inputs, **kwargs) - - return fw - - def set_state_dict(self, state_dict, use_structured_name=True): - self._layer.set_state_dict( - state_dict, use_structured_name=use_structured_name - ) - - def state_dict( - self, - destination=None, - include_sublayers=True, - structured_name_prefix="", - ): - return self._layer.state_dict( - destination=None, include_sublayers=True, structured_name_prefix="" - ) - - def _handle_unslice_params(self): - buffer_size = dict() - buffer_size[Type.fp32.value] = 0 - buffer_size[Type.fp16.value] = 0 - for param in self._unslice_params: - # Updata optimizer master weights - if param.dtype == Type.fp16.value and not self._offload: - self._optim._master_weights[param.name] = paddle.cast( - param, Type.fp32.value - ) - if self._offload: - param.master_weight = paddle.cast(param, Type.fp32.value).cpu() - param2dtype[param.name] = param.dtype - p_align = self._param2align(param) - self._unslice_params2align[param.name] = p_align - buffer_size[param.dtype] += param._numel() + p_align - - # Create unslice_params'grad - for param in sorted(list(self._unslice_params), key=lambda p: p.name): - if param.dtype not in self._grad_storages.keys(): - self._grad_storages[param.dtype] = GradStorage( - buffer_size[param.dtype], - dtype=param.dtype, - device=self._default_device, - destination=self._rank, - parm2align=self._unslice_params2align, - ) - self._grad_storages[param.dtype].add_grad( - param, self._unslice_params2align[param.name] - ) - - def _segment_rank_params(self, layer, name="last_layer"): - """ - Flatten parameters according to layer. - """ - current_layer_params = _current_layer_params(layer) - if current_layer_params: - CHECK_LAYER[id(layer)] = name - self._flatten_layer_params(layer, current_layer_params) - - for name, sub_layer in layer.named_children(): - self._segment_rank_params(sub_layer, name) - - def _flatten_layer_params(self, layer, current_layer_params): - """ - Parameter segmentation and memory integration. - """ - - def _add_manage_info(trainable_param): - return _PartitionParam(trainable_param) - - current_params = list() - for p in current_layer_params: - if p.trainable and p._numel() > self._segment_size: - current_params.append(_add_manage_info(p)) - elif p.trainable: - self._unslice_params.add(_UnsliceParam(p)) - - assert id(layer) not in self._trainable_params.keys() - self._trainable_params[id(layer)] = current_params - - for param in self._trainable_params[id(layer)]: - if param.name in self._param2buffer.keys(): - continue - self._param2buffer[param.name] = [] - # 1.Params alignment - align_ = self._param2align(param) - - offset = align_ + param._numel() - buffer_size = ( - offset - if offset % self._group.nranks == 0 - else offset + self._group.nranks - (offset % self._group.nranks) - ) - self._param2buffer_size[param.name] = buffer_size - - # 2.Combination param buffer - assert buffer_size % self._group.nranks == 0 - pre_buffer = buffer_size // self._group.nranks - - for rank_ in range(self._group.nranks): - self._param2buffer[param.name].append( - (rank_ * pre_buffer, (rank_ + 1) * pre_buffer) - ) - - # Record param's dtype - param2dtype[param.name] = param.dtype - - # 3.Flatten layer params and release other rank buffer - self._param_storage(param, buffer_size) - - def _param_storage(self, param, buffer_size): - """ - This is a function to simplify the handling of parameter InternalStorages. - """ - assert isinstance(buffer_size, int) - value = ( - np.zeros(buffer_size, dtype=np.float16) - if Type.fp16.value == param.dtype - else np.zeros(buffer_size, dtype=np.float32) - ) - buffer = core.VarBase(value=value, place=core.CPUPlace()) - - param_shape = param.shape - origin_state = param.stop_gradient - param.stop_gradient = True - param.flatten_() - param.stop_gradient = origin_state - start, end = self._param2buffer[param.name][self._rank] - - # Copy the current param value - tmp_var = core.VarBase( - tensor=buffer._slice(0, param._numel()), place=core.CPUPlace() - ) - param_cpu = param.cpu() - tmp_var.value().get_tensor().set( - param_cpu.value().get_tensor(), core.CPUPlace() - ) - param.value().get_tensor()._set_dims(param_shape) - - # Current rank param_storage - if self._offload: - param.fw_storage = core.VarBase( - buffer._slice(start, end), - core.CPUPlace(), - "slice@" + param.name, - ) - with device_guard(device="cpu"): - param.master_weight = paddle.cast( - param.fw_storage, Type.fp32.value - ) - else: - param.fw_storage = core.VarBase( - buffer._slice(start, end), "slice@" + param.name - ) - param.status = "part" - - # Updata optimizer master weights - if param.dtype == Type.fp16.value and not self._offload: - self._optim._master_weights[param.fw_storage.name] = paddle.cast( - param.fw_storage, Type.fp32.value - ) - param._clear() - - def _register_forward_hooks(self, layer): - """ - Register pylayer to manage memory slices. - There are four stages: - FW - 1. Before the forward layers, synchronize the full parameters. - 2. After the forward layers, release the full parameter and keep the parameter slice. - BW - 3. Before the backward layers, synchronize the full parameters and create param's grad. - 4. After the gradient accumulation, release the full parameter and keep the parameter slice. - """ - current_layer_params = _current_layer_params(layer) - if current_layer_params: - self._register_forward_all_hooks(layer, self._task_flow) - - for _, sub_layer in layer.named_children(): - self._register_forward_hooks(sub_layer) - - def _register_forward_all_hooks(self, sub_layer, task_flow): - def _forward_pre_hook(layer, inputs): - return ForwardPreHooks( - layer, - self._order_tracer, - self._trainable_params, - self._param2buffer, - self._rank, - self._group, - self._sync_comm, - self._offload, - task_flow, - ) - - def _forward_post_hook(layer, inputs, outputs): - return ForwardPostHooks.apply( - outputs, - layer, - self._order_tracer, - self._trainable_params, - self._param2buffer, - self._param2buffer_size, - self._rank, - self._group, - self._sync_comm, - self._offload, - task_flow, - ) - - # register previous forward hooks - sub_layer.register_forward_pre_hook(_forward_pre_hook) - - # register post forward hooks - sub_layer.register_forward_post_hook(_forward_post_hook) - - @paddle.autograd.no_grad() - def _sync_buffers(self): - """ - Sync all the param buffers from all ranks (exp: batch norm statistics). - """ - - for buffer in self._layer.buffers(include_sublayers=True): - dist.broadcast( - buffer, self._global_root_rank, self._group, sync_op=True - ) - # Multi stream operation will be supported later - dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) - - def __getattr__(self, name): - """Forward missing attributes to wrapped layer.""" - try: - return super().__getattr__(name) - except AttributeError: - return getattr(self._layer, name) - - def _update_params(self): - """ - Update parameters to optimizer memory slice. - """ - update_list = [] - assert len(self._trainable_params.keys()) > 0 - current_layer_params = self._layer.parameters(include_sublayers=True) - trainable_params = list( - filter( - lambda p: p.trainable and p not in self._unslice_params, - current_layer_params, - ) - ) - # 1.Handle param's slice - for param in trainable_params: - assert hasattr( - param, "fw_storage" - ), "Find {} don't have fw_storage attribute".format(param.name) - # Gradient average - if self._offload: - with device_guard(device="cpu"): - param.bw_storage.scale_(scale=self._world_size_scaling) - else: - param.bw_storage.scale_(scale=self._world_size_scaling) - param.fw_storage = _VarBaseWrapper(param) - assert param.fw_storage.grad is None - param.fw_storage._copy_gradient_from(param.bw_storage) - update_list.append(param) - - # 2.Handle unslice param - for grad_storage in self._grad_storages.values(): - grad_storage.buffer.scale_(scale=self._world_size_scaling) - dist.all_reduce( - tensor=grad_storage.buffer, group=self._group, sync_op=True - ) - dist.wait( - tensor=grad_storage.buffer, - group=self._group, - use_calc_stream=True, - ) - - if self._offload: - for param in list(self._unslice_params): - param._clear() - param.master_weight._share_buffer_to(param) - - for grad_storage in self._grad_storages.values(): - for p in grad_storage._params: - tmp_g = _device2cpu(p.grad, convert_dtype=True) - p.clear_gradient(False) - p._gradient_set_empty(False) - p._copy_gradient_from(tmp_g) - tmp_g._clear() - grad_storage.buffer._clear() - - return update_list - - def get_all_parameters(self, convert2cpu=False): - """ - Get the full parameters and return the corresponding task flows. - """ - assert len(self._trainable_params.keys()) > 0 - current_layer_params = self._layer.parameters(include_sublayers=True) - trainable_params = list( - filter( - lambda p: p.trainable and p not in self._unslice_params, - current_layer_params, - ) - ) - t_flow = _allgather_buffer( - trainable_params, - self._group, - use_calc_stream=True, - task_flow=TaskFlow(), - sync_wait=True, - offload=self._offload, - convert2cpu=convert2cpu, - ) - if convert2cpu: - for param in trainable_params: - t_flow.full_param[param.name]._share_buffer_to(param) - - self._optim._parameter_list = self._ori_parameter_list - self._optim._param_groups = self._ori_param_groups - - def _register_backward_hooks(self): - current_layer_params = self._layer.parameters(include_sublayers=True) - trainable_params = list( - filter( - lambda p: p.trainable and p not in self._unslice_params, - current_layer_params, - ) - ) - - for param in trainable_params: - allreduce_function = self._get_allreduce_fn(param) - param._register_backward_hook(allreduce_function) - - def _get_allreduce_fn(self, param): - @paddle.autograd.no_grad() - def allreduce_(*_): - if param.name in self._task_flow.full_grad.keys(): - full_grad = self._task_flow.full_grad[param.name] - # Only support sync allreduce current rank's layer now - dist.all_reduce( - tensor=full_grad, group=self._group, sync_op=True - ) - dist.wait( - tensor=full_grad, group=self._group, use_calc_stream=True - ) - - start, end = self._param2buffer[param.name][self._rank] - if param.bw_storage is None: - param.bw_storage = ( - core.VarBase(full_grad._slice(start, end)) - .detach() - .clone() - ) - if self._offload: - param.bw_storage = _device2cpu(param.bw_storage, True) - else: - if self._offload: - cpu_grad = _device2cpu( - core.VarBase(full_grad._slice(start, end)) - .detach() - .clone(), - True, - ) - with device_guard(device="cpu"): - param.bw_storage = paddle.add( - param.bw_storage, cpu_grad - ) - else: - # param.bw_storage.add_( - # core.VarBase(full_grad._slice(start, end)) - # .detach().clone()) - param.bw_storage = paddle.add( - param.bw_storage, - core.VarBase(full_grad._slice(start, end)) - .detach() - .clone(), - ) - param.clear_gradient(False) - param._gradient_set_empty(False) - tmp_var = self._task_flow.full_grad.pop(param.name) - tmp_var._clear() - - if param.name in self._task_flow.full_param.keys(): - if param.status == "all": - param.use_count = 0 - param._clear() - start, end = self._param2buffer[param.name][self._rank] - param.fw_storage = ( - core.VarBase( - self._task_flow.full_param[param.name]._slice( - start, end - ), - param.name + "@slice", - ) - .detach() - .clone() - ) - param.status = "part" - tmp_var = self._task_flow.full_param.pop(param.name) - tmp_var._clear() - - if self._offload: - param.fw_storage._clear() - param.master_weight._share_buffer_to(param.fw_storage) - - return allreduce_ - - def _param2align(self, param): - # CUDA alignment 256 bytes - size = param._numel() * align[param.dtype] - remaining = size % alignment[self._default_device] - ali = ( - 0 if remaining == 0 else alignment[self._default_device] - remaining - ) - align_ = ali // align[param.dtype] - return align_ - - def _redefine_opt_step(self): - params_slice_func = self._update_params_slice - opt_step = self._optim.step - - def _opt_step(self): - if not self.update_scaler: - params_slice_func() - if self.offload: - with device_guard(device="cpu"): - opt_step() - else: - opt_step() - - def _opt_minimize(self): - raise RuntimeError( - "optimizer.minimize() not support now, please use optimizer.step()" - ) - - self._optim.step = MethodType(_opt_step, self._optim) - self._optim.minimize = MethodType(_opt_minimize, self._optim) - - def _redefine_opt_clear(self): - clear_func = self._clear_gradients - - def _opt_clear(self): - clear_func() - - self._optim.clear_grad = MethodType(_opt_clear, self._optim) - - -def ForwardPreHooks( - layer, - order_tracer, - trainable_params, - param2buffer, - rank, - group, - sync_comm, - offload, - task_flow, -): - - # Record layer's id - layer_id = id(layer) - use_calc, sync_wait = False, False - - if layer_id not in order_tracer.keys() or sync_comm: - use_calc, sync_wait = True, True - - # Whether to use calc stream - task_flow.use_calc[layer_id] = use_calc - else: - # Whether to use calc stream - task_flow.use_calc[layer_id] = use_calc - # wait current layer params - _wait_layer( - trainable_params[layer_id], task_flow, group, use_calc, offload - ) - - if layer_id == order_tracer["layer"][-1]: - return - order_ = order_tracer[layer_id] - layer_id = order_tracer["layer"][order_ + 1] - - _allgather_buffer( - trainable_params[layer_id], - group, - use_calc_stream=use_calc, - task_flow=task_flow, - sync_wait=sync_wait, - offload=offload, - ) - - return - - -class ForwardPostHooks(PyLayer): - @staticmethod - def forward( - ctx, - inputs, - layer, - order_tracer, - trainable_params, - param2buffer, - param2buffer_size, - rank, - group, - sync_comm, - offload, - task_flow, - ): - - layer_id = id(layer) - # release current layer full params - _release_param( - trainable_params[layer_id], param2buffer, rank, task_flow, offload - ) - - if layer_id not in order_tracer.keys(): - order_ = order_tracer["order"] - order_tracer[layer_id] = order_ - order_tracer["order"] += 1 - order_tracer["layer"].append(layer_id) - - # Record bw info - ctx.order_tracer = order_tracer - ctx.task_flow = task_flow - ctx.group = group - ctx.layer = layer - ctx.sync_comm = sync_comm - ctx.trainable_params = trainable_params - ctx.param2buffer_size = param2buffer_size - ctx.offload = offload - - return inputs - - @staticmethod - def backward(ctx, *args): - # Load context value - order_tracer = ctx.order_tracer - task_flow = ctx.task_flow - group = ctx.group - layer = ctx.layer - trainable_params = ctx.trainable_params - param2buffer_size = ctx.param2buffer_size - sync_comm = ctx.sync_comm - offload = ctx.offload - layer_id = id(layer) - use_calc, sync_wait = False, False - - # Allgather params synchronization - if sync_comm: - use_calc, sync_wait = True, True - _allgather_buffer( - trainable_params[layer_id], - group, - use_calc_stream=use_calc, - task_flow=task_flow, - sync_wait=sync_wait, - offload=offload, - ) - else: - _wait_layer( - trainable_params[layer_id], task_flow, group, use_calc, offload - ) - - # Create params's grad - _create_params_grad( - trainable_params[layer_id], param2buffer_size, task_flow - ) - - # Whether to use calc stream - task_flow.use_calc[layer_id] = use_calc - if layer_id != order_tracer["layer"][0] and not sync_comm: - layer_next_id = order_tracer["layer"][order_tracer[layer_id] - 1] - _allgather_buffer( - trainable_params[layer_next_id], - group, - use_calc_stream=use_calc, - task_flow=task_flow, - sync_wait=sync_wait, - offload=offload, - ) - - return args - - -class TaskFlow: - """ - Task flows, one way linked list for task acquisition. - """ - - def __init__( - self, - full_param=dict(), - full_grad=dict(), - use_calc=dict(), - callback=None, - ): - self.full_param = full_param - self.full_grad = full_grad - self.use_calc = use_calc - self.callback = callback - - -def _release_param( - trainable_params, param2buffer, rank, task_flow, offload=False -): - for param in trainable_params: - # async communicate share weight not clear - param.use_count -= 1 - if param.use_count == 0: - param._clear() - if param.name in task_flow.full_param.keys(): - start, end = param2buffer[param.name][rank] - with paddle.amp.auto_cast(enable=False): - param.fw_storage = ( - core.VarBase( - task_flow.full_param[param.name]._slice(start, end), - param.name + "@slice", - ) - .detach() - .clone() - ) - param.status = "part" - tmp_var = task_flow.full_param.pop(param.name) - tmp_var._clear() - - if offload: - param.fw_storage = _device2cpu(param.fw_storage) - return - - -def _wait_layer( - trainable_params, task_flow, group, use_calc_stream, offload=False -): - paddle.device.cuda.synchronize() - for param in trainable_params: - if param.status == "all": - param.use_count += 1 - continue - if param.name in task_flow.full_param.keys(): - full_param = task_flow.full_param[param.name] - core.VarBase(full_param._slice(0, param._numel()))._share_buffer_to( - param - ) - param.fw_storage._clear() - param.fw_storage = None - param.status = "all" - param.use_count += 1 - else: - _allgather_buffer( - trainable_params, - group, - use_calc_stream=True, - task_flow=task_flow, - sync_wait=True, - offload=offload, - ) - break - return task_flow - - -def _allgather_buffer( - trainable_params, - group, - use_calc_stream, - task_flow, - sync_wait=False, - offload=False, - convert2cpu=False, -): - - for param in trainable_params: - if param.status == "all": - param.use_count += 1 - continue - - if offload: - param.fw_storage = _cpu2device(param) - - with paddle.amp.auto_cast(enable=False): - full_param = _all_gather( - param.fw_storage, group, use_calc_stream=use_calc_stream - ) - - # Allgather current layer in the 1st step synchronously - if sync_wait: - with paddle.amp.auto_cast(enable=False): - dist.wait( - tensor=full_param, - group=group, - use_calc_stream=use_calc_stream, - ) - core.VarBase(full_param._slice(0, param._numel()))._share_buffer_to( - param - ) - param.fw_storage._clear() - param.fw_storage = None - param.status = "all" - param.use_count += 1 - task_flow.full_param[param.name] = full_param - - # parameter converts to cpu - if convert2cpu: - p_name = param.name - param = _device2cpu(param) - tmp_var = task_flow.full_param.pop(p_name) - tmp_var._clear() - task_flow.full_param[p_name] = param - - return task_flow - - -@paddle.autograd.no_grad() -def _create_params_grad(trainable_params, param2buffer_size, task_flow): - for param in trainable_params: - if param.name in task_flow.full_grad.keys(): - continue - assert isinstance(param2buffer_size[param.name], int) - temp_grad = paddle.zeros( - [param2buffer_size[param.name]], dtype=param.dtype - ) - param._copy_gradient_from( - core.VarBase(temp_grad._slice(0, param._numel())) - ) - task_flow.full_grad[param.name] = temp_grad - return task_flow - - -def _PartitionParam(param): - if not hasattr(param, "fw_storage"): - setattr(param, "fw_storage", None) - setattr(param, "bw_storage", None) - setattr(param, "master_weight", None) - setattr(param, "status", "all") - setattr(param, "use_count", 0) - return param - - -def _UnsliceParam(param): - if not hasattr(param, "unslice"): - setattr(param, "unslice", True) - setattr(param, "master_weight", None) - return param - - -def _VarBaseWrapper(param): - varbase = param.fw_storage - tmp_param = ParamBase( - shape=varbase.shape, dtype=varbase.dtype, name="slice@" + param.name - ) - varbase._share_buffer_to(tmp_param) - tmp_param.regularizer = param.regularizer - tmp_param.optimize_attr['learning_rate'] = param.optimize_attr[ - 'learning_rate' - ] - varbase._clear() - return tmp_param - - -def _OptimizerWrapper(optimizer, offload, group, update_params_slice): - if not hasattr(optimizer, "_optim"): - setattr(optimizer, "_optim", optimizer) - setattr(optimizer, "offload", offload) - setattr(optimizer, "group", group) - setattr(optimizer, "update_scaler", None) - setattr(optimizer, "update_slice", update_params_slice) - return optimizer - - -def _device2cpu(trans_param, convert_dtype=False): - if convert_dtype: - trans_param = paddle.cast(trans_param, Type.fp32.value) - tmp_p = trans_param.cpu() - trans_param._clear() - return tmp_p - - -def _cpu2device(param): - tmp_p = param.fw_storage.cuda(DEV_ID) - if ( - tmp_p.dtype == Type.fp32.value - and param2dtype[param.name] == Type.fp16.value - ): - tmp_p = paddle.cast(tmp_p, Type.fp16.value) - return tmp_p - - -def _current_layer_params(layer): - return ( - layer.parameters(include_sublayers=False) + list(layer.extra_parameters) - if hasattr(layer, "extra_parameters") - else layer.parameters(include_sublayers=False) - ) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py deleted file mode 100644 index 22f2eb8f1b8eab9c91ee856ea63059aa9d3e08a4..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py +++ /dev/null @@ -1,252 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import contextlib -from enum import Enum -from types import MethodType - -import numpy as np - -import paddle -from paddle import _legacy_C_ops -from paddle.fluid import core, layers -from paddle.fluid.dygraph import base as imperative_base -from paddle.fluid.dygraph import to_variable -from paddle.fluid.framework import dygraph_only - - -class Taskflow: - """ - Task flows, one way linked list for task acquisition. - """ - - def __init__(self, task, callback): - self.task = task - self.callback = callback - - -class Type(Enum): - """ - Type of trainable parameters - """ - - fp16 = paddle.float16 - bf16 = paddle.bfloat16 - fp32 = paddle.float32 - - -class ShardingClipGrad: - def __init__(self, clip, device, group): - self._clip = clip - self._device = device - self._group = group - - @imperative_base.no_grad - def _dygraph_clip(self, params_grads): - sum_square_fp32, sum_square_fp16 = [], [] - unslice_params_fp32, unslice_params_fp16 = [], [] - - for p, g in params_grads: - p_slice = True # using for slice parameter in sharding stage3 - if g is None or getattr(p, 'need_clip', True) is False: - continue - if hasattr(p, "unslice"): - p_slice = False - - merge_grad = g - if g.type == core.VarDesc.VarType.SELECTED_ROWS: - merge_grad = layers.get_tensor_from_selected_rows( - layers.merge_selected_rows(g) - ) - square = paddle.square(merge_grad) - sum_square = paddle.sum(square) - if p.dtype == paddle.float16: - if p_slice: - sum_square_fp16.append(sum_square) - else: - unslice_params_fp16.append(sum_square) - elif p.dtype == paddle.float32: - if p_slice: - sum_square_fp32.append(sum_square) - else: - unslice_params_fp32.append(sum_square) - - # global norm of non-distributed FP16 params_and_grads - if len(sum_square_fp16) == 0: - global_norm_fp16 = paddle.to_tensor([0.0], dtype=paddle.float32) - else: - global_norm_fp16 = layers.concat(sum_square_fp16) - global_norm_fp16 = paddle.sum(global_norm_fp16) - global_norm_fp16 = paddle.cast( - global_norm_fp16, dtype=paddle.float32 - ) - - # global norm of non-distributed FP16 params_and_grads for unslice parameter - if len(unslice_params_fp16) == 0: - global_unslice_fp16 = paddle.to_tensor([0.0], dtype=paddle.float32) - else: - global_unslice_fp16 = layers.concat(unslice_params_fp16) - global_unslice_fp16 = paddle.sum(global_unslice_fp16) - global_unslice_fp16 = paddle.cast( - global_unslice_fp16, dtype=paddle.float32 - ) - - # global norm of non-distributed FP32 params_and_grads - global_norm_fp32 = ( - layers.concat(sum_square_fp32) - if len(sum_square_fp32) != 0 - else paddle.to_tensor([0.0], dtype=paddle.float32) - ) - global_norm_fp32 = paddle.sum(global_norm_fp32) - - # global norm of non-distributed FP32 params_and_grads for unslice parameter - global_unslice_fp32 = ( - layers.concat(unslice_params_fp32) - if len(unslice_params_fp32) != 0 - else paddle.to_tensor([0.0], dtype=paddle.float32) - ) - global_unslice_fp32 = paddle.sum(global_unslice_fp32) - global_unslice_var = global_unslice_fp16 + global_unslice_fp32 - - global_norm_var = ( - global_norm_fp16 - + global_norm_fp32 - + 1.0 / self._group.nranks * global_unslice_var - ) - - # add all reduce to get global norm of distributed params_and_grads - dev_id = int(self._device.split(":")[1]) - with device_guard(dev_id, "gpu"): - paddle.distributed.all_reduce(global_norm_var, group=self._group) - - global_norm_var = paddle.sqrt(global_norm_var) - max_global_norm = layers.fill_constant( - shape=[1], dtype=global_norm_var.dtype, value=self.clip_norm - ) - - clip_var = paddle.divide( - x=max_global_norm, - y=paddle.maximum(x=global_norm_var, y=max_global_norm), - ) - clip_var_fp16 = paddle.cast(clip_var, paddle.float16) - - for p, g in params_grads: - if getattr(p, 'need_clip', True) is False or g is None: - continue - origin_state = g.stop_gradient - g.stop_gradient = True - if p.dtype == paddle.float16: - g.scale_(clip_var_fp16) - else: - g.scale_(clip_var) - g.stop_gradient = origin_state - p._reset_grad_inplace_version(True) - - return params_grads - - def __getattr__(self, item): - return getattr(self._clip, item) - - def __call__(self, params_grads): - return self._dygraph_clip(params_grads) - - -@contextlib.contextmanager -def device_guard(dev_id=0, device="cpu"): - origin_device = paddle.device.get_device() - if device == "cpu": - paddle.set_device(device) - elif device == "gpu": - paddle.set_device("gpu:{}".format(dev_id)) - try: - yield - finally: - paddle.set_device(origin_device) - - -@dygraph_only -def ShardingScaler(scaler): - def unscale_method(self, optimizer): - if not self._enable: - return - param_grads = [] - param_grads_fp16 = [] - param_grads_fp32 = [] - if hasattr(optimizer, "update_slice"): - optimizer.update_slice() - optimizer.update_scaler = True - - if getattr(optimizer._optim, '_param_groups', None) and isinstance( - optimizer._optim._param_groups[0], dict - ): - - for group in optimizer._optim._param_groups: - for param in group['params']: - if param._grad_ivar() is not None: - param_grads.append(param._grad_ivar()) - if param._grad_ivar().dtype in [ - core.VarDesc.VarType.FP16, - paddle.float16, - ]: - param_grads_fp16.append(param._grad_ivar()) - else: - param_grads_fp32.append(param._grad_ivar()) - else: - for param in optimizer._optim._parameter_list: - if param.grad is not None: - param_grads.append(param.grad) - if param.grad.dtype in [ - core.VarDesc.VarType.FP16, - paddle.float16, - ]: - param_grads_fp16.append(param.grad) - else: - param_grads_fp32.append(param.grad) - - temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_)) - temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_)) - - device = "cpu" if optimizer.offload else "gpu" - dev_id = ( - 0 if device == "cpu" else int(paddle.get_device().split(":")[1]) - ) - - with device_guard(dev_id, device): - if len(param_grads_fp16): - _legacy_C_ops.check_finite_and_unscale( - param_grads_fp16, - self._scale, - param_grads_fp16, - temp_found_inf_fp16, - ) - if len(param_grads_fp32): - _legacy_C_ops.check_finite_and_unscale( - param_grads_fp32, - self._scale, - param_grads_fp32, - temp_found_inf_fp32, - ) - - self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0 - is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32") - - paddle.distributed.all_reduce( - is_found_inf, - op=paddle.distributed.ReduceOp.MAX, - group=optimizer.group, - ) - self._found_inf = is_found_inf.numpy()[0] - - scaler._unscale = MethodType(unscale_method, scaler) - return scaler diff --git a/python/paddle/distributed/fleet/utils/internal_storage.py b/python/paddle/distributed/fleet/utils/internal_storage.py deleted file mode 100644 index 2b27d6a0dcd771ace32e06f8cff1a70300d2178b..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fleet/utils/internal_storage.py +++ /dev/null @@ -1,348 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The file has been adapted from fairscale file: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/nn/misc/param_bucket.py -# Git commit hash: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e -# We retain the following license from the original files: - -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -# -# This source code is licensed under the BSD license found in the -# LICENSE file in the root directory of this source tree. - -import numpy as np - -import paddle -from paddle import framework - -# (TODO: GhostScreaming) It will be removed later. -from paddle.fluid import core - -from ..meta_parallel.sharding.sharding_utils import Type, device_guard - - -class InternalStorage: - """ - This is a basic class, which is responsible for consolidating the basic storage tensor. - - """ - - # Support integration parameter tensor - def __init__(self, size, dtype, device, convert_cpu=False): - self._params = [] - self._param_ids = [] - self._fill = 0 - self._device = device - self._dtype = dtype - - # The actual flat tensor - size = [size] if isinstance(size, int) else size - if convert_cpu: - value = ( - np.zeros(size, dtype=np.float16) - if Type.fp16.value == dtype - else np.zeros(size, dtype=np.float32) - ) - self.buffer = core.VarBase(value=value, place=core.CPUPlace()) - else: - self.buffer = paddle.zeros(size, dtype=dtype) - - def to(self, device, dtype=None, keep_alignment=True): - """ - Move the underlying buffer - """ - assert ( - self.buffer is not None - ), "Cannot move a collapsed bucket, please rebuild it" - assert ( - dtype == Type.fp32.value or Type.fp16.value - ), "Conversion type is not supported now" - - dev_id = ( - 0 - if paddle.get_device() == "cpu" - else int(paddle.get_device().split(":")[1]) - ) - - if self._device != device: - tmp_buffer = ( - self.buffer.cuda(dev_id) - if device == "gpu" - else self.buffer.cpu() - ) - for param in self._params: - param.clear_gradient(False) - param._gradient_set_empty(False) - self.buffer.value().get_tensor()._clear() - self.buffer = tmp_buffer - self._device = device - - if dtype is not None: - self.buffer = self.buffer.cast(dtype=dtype) - self._dtype = dtype - - -class ParamStorage(InternalStorage): - """ - This is a basic class to simplify the handling of parameter InternalStorages. - """ - - def __init__(self, size, dtype, device): - super().__init__(size, dtype, device, convert_cpu=True) - self.param2align = None - - def to(self, device, dtype=None, keep_alignment=True): - """ - Move the underlying buffer - """ - - super().to(device, dtype) - - if keep_alignment: - self._array_params() - - @framework.no_grad() - def add_rank_params(self, trainable_params, param2align, convert_gpu=True): - """ - Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer. - """ - - assert all( - [id(param) not in self._param_ids for param in trainable_params] - ), "The same param cannot be checked in twice" - assert self.buffer is not None - - self.param2align = param2align - - cpu_param_shape = list() - for param in trainable_params: - p_shape = self._add_param_as_view( - param, param2align[param.name], convert_gpu - ) - cpu_param_shape.append(p_shape) - - if convert_gpu: - # buffer convert from cpu to cuda - dev_id = int(paddle.get_device().split(":")[1]) - self.buffer = self.buffer.cuda(dev_id) - - self._fill = 0 - - for idx, param in enumerate(trainable_params): - self._convert_buffer( - param, cpu_param_shape[idx], param2align[param.name] - ) - self._params.append(param) - self._param_ids.append(id(param)) - - @framework.no_grad() - def _add_param_as_view(self, param, align, convert_gpu=True): - - assert ( - param.dtype == self.buffer.dtype - ), "Different types for the InternalStorage and the param, cannot proceed: {} - {}".format( - param.dtype, self.buffer.dtype - ) - - var_end = self._fill + np.prod(param.shape) - offset = var_end + align - assert offset <= np.prod(self.buffer.shape) - - p_shape = param.shape - - origin_state = param.stop_gradient - param.stop_gradient = True - param.flatten_() - param.stop_gradient = origin_state - - # Copy the current param value - dev_id = ( - 0 - if paddle.get_device() == "cpu" - else int(paddle.get_device().split(":")[1]) - ) - with device_guard(dev_id, "cpu"): - tmp_var = core.VarBase( - tensor=self.buffer._slice(self._fill, var_end) - ) - if convert_gpu: - param_cpu = param.cpu() - param.value().get_tensor()._clear() - tmp_var.set_value(param_cpu) - else: - tmp_var.set_value(param) - - self._fill = offset - return p_shape - - @framework.no_grad() - def _convert_buffer(self, param, p_shape, align): - - var_end = self._fill + np.prod(p_shape) - offset = var_end + align - assert offset <= np.prod(self.buffer.shape) - - # Convert the param value - tmp_tensor = self.buffer._slice(self._fill, var_end) - param.value().get_tensor()._share_data_with(tmp_tensor) - param.value().get_tensor()._set_dims(p_shape) - - self._fill = offset - - @framework.no_grad() - def _array_params(self): - """ - Given the parameters which have been registered previously, rebuild the whole InternalStorage. - """ - assert len(self._params) > 0 - assert self.param2align is not None - - self._fill = 0 - for p in self._params: - self._convert_buffer(p, p.shape, self.param2align[p.name]) # modify - - -class GradStorage(InternalStorage): - """ - This is a basic class to simplify the handling of gradient InternalStorages - """ - - def __init__( - self, size, dtype, device, destination, parm2align, convert_cpu=False - ): - if isinstance(size, np.int64): - size = size.tolist() - super().__init__(size, dtype, device, convert_cpu) - - self._max_size = size - self._release = False - - self.params_checked_in = 0 - self.destination = destination - self._parm2align = parm2align - self.sent = False - - def reset_checked_in(self): - """Reset the counter of the parameter grads which have been checked in""" - self.params_checked_in = 0 - self.sent = False - - @property - def all_checked_in(self): - """Judge all the expected gradient check-in happened""" - return len(self._params) == self.params_checked_in - - def can_add_grad_view(self, param, align): - """Is there enough InternalStorage to add this parameter gradient, and whether this param have already checked in.""" - return ( - self._fill + np.prod(param.shape) + align <= self._max_size - and id(param) not in self._param_ids - ) - - def to(self, device, dtype=None, keep_alignment=True): - """ - Move the underlying buffer - """ - if self._release: - self.rebuild() - - super().to(device, dtype) - - if keep_alignment: - self._array_grads() - - @framework.no_grad() - def add_grad(self, param, align): - """ - Add a new parameter gradient to the InternalStorage. Param.grad becomes a view of this InternalStorage buffer. - """ - - assert ( - id(param) not in self._param_ids - ), "The same gradients cannot be checked in twice" - - self._add_grad_as_view(param, align) - self._params.append(param) - self._param_ids.append(id(param)) - - @framework.no_grad() - def manumal_relase(self): - """ - Release the buffer from InternalStorage. The InternalStorage will need to be rebuilt before use. - """ - if not self._release: - for p in self._params: - if p.grad is not None: - p.clear_gradient(False) - p._gradient_set_empty(False) - - self.buffer = None - self._fill = 0 - self.params_checked_in = 0 - self._release = True - - @framework.no_grad() - def rebuild(self): - """ - Given the parameter gradients which have been registered previously, rebuild the whole InternalStorage. - """ - - if self._release: - self.buffer = paddle.zeros([self._max_size], dtype=self._dtype) - - for p in self._params: - self._add_grad_as_view(p, self._parm2align[p.name]) - - self._release = False - - @framework.no_grad() - def _array_grads(self): - """ - Given the parameters gradients which have been registered previously, rebuild the whole InternalStorage. - """ - if len(self._params) > 0: - self._fill = 0 - for p in self._params: - self._add_grad_as_view(p, self._parm2align[p.name]) - - @framework.no_grad() - def _add_grad_as_view(self, param, align): - assert ( - np.prod(self.buffer.shape) > 0 - ), "Cannot add a gradient to a released InternalStorage, please rebuild" - assert param.dtype == self.buffer.dtype - - grad_end = self._fill + np.prod(param.shape) - offset = grad_end + align - assert offset <= np.prod(self.buffer.shape) - - # Copy the current grad value to InternalStorage - dev_id = ( - 0 - if paddle.get_device() == "cpu" - else int(paddle.get_device().split(":")[1]) - ) - if self._device == "cpu": - with device_guard(dev_id, self._device): - tmp_var = core.VarBase(self.buffer._slice(self._fill, grad_end)) - param._copy_gradient_from(tmp_var) - tmp_var.value().get_tensor()._clear() - - elif self._device == "gpu": - tmp_var = core.VarBase(self.buffer._slice(self._fill, grad_end)) - param._copy_gradient_from(tmp_var) - tmp_var.value().get_tensor()._clear() - - self._fill = offset diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 703e14f4a3adb146682e61a43f6c0bdad8d85fe0..e8e6c2ebc9ca399461d42f366518f76a06d9051e 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -16,13 +16,6 @@ import logging import os import paddle - -# Old version -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) - -# New version from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import ( GroupShardedOptimizerStage2, ) @@ -35,17 +28,7 @@ from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import ( GroupShardedScaler, ) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( - ShardingStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ( - ShardingStage3, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ( - ShardingScaler, -) from paddle.distributed.utils.log_utils import get_logger -from paddle.fluid.framework import in_dygraph_mode from paddle.optimizer import Optimizer logger_ = get_logger(logging.WARNING) @@ -148,71 +131,39 @@ def group_sharded_parallel( logger_.info("*" * 30) logger_.info("Sharded level os uses sharded level os_g achieved now.") logger_.info("*" * 30) - if in_dygraph_mode(): - optimizer = GroupShardedOptimizerStage2( - params=optimizer._parameter_list, - optim=optimizer, - group=group, - offload=offload, - dp_group=dp_group, - device=device, - ) - model = GroupShardedStage2( - model, - optimizer, - group=group, - sync_buffers=sync_buffers, - buffer_max_size=buffer_max_size, - dp_group=dp_group, - device=device, - ) - else: - optimizer = ShardingOptimizerStage2( - params=model.parameters(), - optim=optimizer, - group=group, - offload=offload, - device=device, - ) - model = ShardingStage2( - model, - optimizer, - group=group, - sync_buffers=sync_buffers, - buffer_max_size=buffer_max_size, - device=device, - ) + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, + optim=optimizer, + group=group, + offload=offload, + dp_group=dp_group, + device=device, + ) + model = GroupShardedStage2( + model, + optimizer, + group=group, + sync_buffers=sync_buffers, + buffer_max_size=buffer_max_size, + dp_group=dp_group, + device=device, + ) elif level == 'p_g_os': - if in_dygraph_mode(): - model = GroupShardedStage3( - model, - optimizer=optimizer, - group=group, - sync_buffers=sync_buffers, - segment_size=segment_size, - offload=offload, - sync_comm=sync_comm, - dp_group=dp_group, - device=device, - ) - else: - model = ShardingStage3( - model, - optimizer=optimizer, - group=group, - sync_buffers=sync_buffers, - segment_size=segment_size, - offload=offload, - sync_comm=sync_comm, - device=device, - ) + model = GroupShardedStage3( + model, + optimizer=optimizer, + group=group, + sync_buffers=sync_buffers, + segment_size=segment_size, + offload=offload, + sync_comm=sync_comm, + dp_group=dp_group, + device=device, + ) else: raise ValueError("Please enter the correct level.") if isinstance(scaler, paddle.amp.GradScaler): - if in_dygraph_mode(): - scaler = GroupShardedScaler(scaler) - else: - scaler = ShardingScaler(scaler) + scaler = GroupShardedScaler(scaler) logger_.info("*" * 30) logger_.info( "If there is a communication hang using group sharded, please check whether the communication operations of each process are unified." @@ -275,9 +226,9 @@ def save_group_sharded_model(model, output, optimizer=None): ), "Saving directory ({}) should be a directory, not a file".format(output) os.makedirs(output, exist_ok=True) output_model = os.path.join(output, "model.pdmodel") - if isinstance(model, (ShardingStage2, GroupShardedStage2)): + if isinstance(model, GroupShardedStage2): paddle.save(model._layer.state_dict(), output_model) - elif isinstance(model, (ShardingStage3, GroupShardedStage3)): + elif isinstance(model, GroupShardedStage3): convert2cpu = True if model._offload else False model.get_all_parameters(convert2cpu=convert2cpu) paddle.save(model._layer.state_dict(), output_model) diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_optimizer_stage2.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_optimizer_stage2.py deleted file mode 100644 index 0c6f115d94a769f0f2f54d3b46d053ac481648bd..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_optimizer_stage2.py +++ /dev/null @@ -1,144 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from paddle.distributed.fleet.utils.internal_storage import GradStorage -from paddle.nn import Linear - -base_lr = 0.1 -momentum_rate = 0.9 -l2_decay = 1e-4 - -epoch = 100 -batch_size = 32 -class_dim = 102 - - -class MLP(fluid.Layer): - def __init__(self, param_attr=None, bias_attr=None): - super().__init__() - - self._linear1 = Linear(10, 10) - self._linear2 = Linear(10, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - return y - - -def reader_decorator(): - def __reader__(): - for _ in range(100): - img = np.random.rand(10).astype('float32') - label = np.ones(1).astype('int64') - yield img, label - - return __reader__ - - -def optimizer_setting(parameter_list=None): - optimizer = paddle.optimizer.Momentum( - learning_rate=base_lr, - momentum=momentum_rate, - weight_decay=paddle.regularizer.L2Decay(l2_decay), - parameters=parameter_list, - ) - return optimizer - - -def train_mlp(): - fleet.init(is_collective=True) - group = paddle.distributed.new_group([0, 1]) - - mlp = MLP() - - optimizer = optimizer_setting(parameter_list=mlp.parameters()) - oss_optimizer = ShardingOptimizerStage2( - params=mlp.parameters(), optim=optimizer, group=group - ) - # cover grad_storage code - trainable_param2align = dict() - for p in mlp.parameters(): - trainable_param2align[p.name] = 0 - grad_storage = GradStorage( - 10000, - dtype=paddle.float32, - device="gpu", - destination=0, - parm2align=trainable_param2align, - ) - for p in mlp.parameters(): - grad_storage.can_add_grad_view(p, trainable_param2align[p.name]) - grad_storage.add_grad(p, trainable_param2align[p.name]) - grad_storage.manumal_relase() - grad_storage.rebuild() - grad_storage.reset_checked_in() - - train_reader = paddle.batch( - reader_decorator(), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - for eop in range(epoch): - mlp.train() - - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - - out = mlp(img) - loss = paddle.nn.functional.cross_entropy(input=out, label=label) - avg_loss = paddle.mean(x=loss) - acc_top1 = paddle.metric.accuracy(input=out, label=label, k=1) - acc_top5 = paddle.metric.accuracy(input=out, label=label, k=5) - - dy_out = avg_loss.numpy() - - avg_loss.backward() - oss_optimizer.step() - - # oss_optimizer clear cache - oss_optimizer._clear_cache() - - # check optimizer.minimize() error - try: - oss_optimizer.minimize() - except: - print( - "====== Find sharding_stage2_optimizer.minimize() error ======" - ) - return - - -if __name__ == '__main__': - train_mlp() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2.py deleted file mode 100644 index a3502f6000d00d313e0c1516fa06d7a2307a7ad4..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2.py +++ /dev/null @@ -1,242 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import tempfile - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( - ShardingStage2, -) -from paddle.nn import Linear - -seed = 2022 -epoch = 2 -linear_size = 1000 - -strategy = fleet.DistributedStrategy() -strategy.hybrid_configs = { - "dp_degree": 2, - "mp_degree": 1, - "pp_degree": 1, - "sharding_degree": 1, -} - -np.random.seed(seed) -paddle.seed(seed) - - -class MLP(fluid.Layer): - def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): - super().__init__() - - self._linear1 = Linear(linear_size, linear_size) - self._linear2 = Linear(linear_size, linear_size) - self._linear3 = Linear(linear_size, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - y = self._linear3(y) - return y - - -def reader_decorator(linear_size=1000): - def __reader__(): - for _ in range(100): - img = np.random.rand(linear_size).astype('float32') - label = np.ones(1).astype('int64') - yield img, label - - return __reader__ - - -def optimizer_setting(model, use_pure_fp16, opt_group=False): - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.AdamW( - parameters=[{"params": model.parameters()}] - if opt_group - else model.parameters(), - learning_rate=0.001, - weight_decay=0.00001, - grad_clip=clip, - multi_precision=use_pure_fp16, - ) - - return optimizer - - -def train_mlp( - model, - sharding_stage, - batch_size=100, - use_pure_fp16=False, - accumulate_grad=False, - opt_group=False, - save_model=False, -): - if sharding_stage == "dp": - hcg = fleet.get_hybrid_communicate_group() - group = hcg.get_check_parallel_group() - else: - group = paddle.distributed.new_group([0, 1]) - if opt_group: - optimizer = optimizer_setting( - model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group - ) - else: - optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) - - if sharding_stage == 2: - optimizer = ShardingOptimizerStage2( - params=model.parameters(), optim=optimizer, group=group - ) - - model = ShardingStage2( - model, optimizer, group=group, buffer_max_size=2**21 - ) - else: - optimizer = fleet.distributed_optimizer(optimizer) - model = fleet.distributed_model(model) - - train_reader = paddle.batch( - reader_decorator(), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - if sharding_stage == 2: - model.to(device="gpu") - - for eop in range(epoch): - model.train() - - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - - out = model(img) - loss = paddle.nn.functional.cross_entropy(input=out, label=label) - - avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) - if batch_size == 20: - avg_loss = avg_loss / 5 - avg_loss.backward() - - if not accumulate_grad: - optimizer.step() - optimizer.clear_grad() - - if accumulate_grad: - optimizer.step() - optimizer.clear_grad() - - if save_model: - return model, optimizer - return model.parameters() - - -def test_dp_stage2(): - mlp = MLP() - state_dict = mlp.state_dict() - mlp1 = MLP() - mlp2 = MLP() - mlp3 = MLP() - mlp4 = MLP() - mlp5 = MLP() - mlp6 = MLP() - mlp1.set_state_dict(state_dict) - mlp2.set_state_dict(state_dict) - mlp3.set_state_dict(state_dict) - mlp4.set_state_dict(state_dict) - mlp5.set_state_dict(state_dict) - mlp6.set_state_dict(state_dict) - - # DP VS stage2 - dp_params = train_mlp( - mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False - ) - stage2_params = train_mlp( - mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False - ) - for i in range(len(dp_params)): - np.testing.assert_allclose( - dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6 - ) - - # stage2 accumulate grad - stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True) - stage2_accumulate_grad = train_mlp( - mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True - ) - for i in range(len(stage2_params)): - np.testing.assert_allclose( - stage2_params[i].numpy(), - stage2_accumulate_grad[i].numpy(), - rtol=1e-5, - atol=1e-5, - ) - - # stage2 param list VS param group - stage2_params = train_mlp( - mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True - ) - for i in range(len(dp_params)): - np.testing.assert_allclose( - dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6 - ) - - # save/load model - output_dir = tempfile.mkdtemp() - model_file = os.path.join(output_dir, "model.pdmodel") - optimizer_file = os.path.join(output_dir, "model.pdopt") - model_stage2, optimizer_stage2 = train_mlp( - mlp6, - sharding_stage=2, - use_pure_fp16=False, - opt_group=False, - save_model=True, - ) - paddle.save(model_stage2.state_dict(), model_file) - paddle.save(optimizer_stage2.state_dict(), optimizer_file) - m_state_dict = paddle.load(model_file) - opt_state_dict = paddle.load(optimizer_file) - model_stage2.set_state_dict(m_state_dict) - optimizer_stage2.set_state_dict(opt_state_dict) - shutil.rmtree(output_dir) - - return - - -if __name__ == '__main__': - fleet.init(is_collective=True, strategy=strategy) - test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2_offload.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2_offload.py deleted file mode 100644 index f4d15143f91c0a89e5c5af23db96ce997ae6f750..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage2_offload.py +++ /dev/null @@ -1,122 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -from dygraph_sharding_stage2 import MLP, optimizer_setting, reader_decorator - -import paddle -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( - ShardingStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ( - ShardingScaler, -) - -seed = 2021 -epoch = 2 -batch_size = 32 -linear_size = 1000 - -strategy = fleet.DistributedStrategy() -strategy.hybrid_configs = { - "dp_degree": 2, - "mp_degree": 1, - "pp_degree": 1, - "sharding_degree": 1, -} - -np.random.seed(seed) -paddle.seed(seed) - - -def train_mlp(model, offload=False): - optimizer = optimizer_setting(model=model, use_pure_fp16=True) - - model = paddle.amp.decorate(models=model, level='O2', save_dtype='float32') - scaler = paddle.amp.GradScaler(init_loss_scaling=1024) - scaler = ShardingScaler(scaler) - - optimizer = ShardingOptimizerStage2( - params=model.parameters(), optim=optimizer, offload=offload - ) - model = ShardingStage2(model, optimizer, buffer_max_size=2**21) - - train_reader = paddle.batch( - reader_decorator(linear_size), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - for eop in range(epoch): - model.train() - - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - - with paddle.amp.auto_cast(True, level='O2'): - out = model(img) - loss = paddle.nn.functional.cross_entropy( - input=out, label=label - ) - - avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) - scaler.scale(avg_loss).backward() - - scaler.step(optimizer) - scaler.update() - optimizer.clear_grad() - - for dtype in optimizer.param_storages: - for dst_rank, param_storage in optimizer.param_storages[dtype].items(): - param_storage.to(device="gpu", dtype=dtype) - - return model.parameters() - - -def test_sharding_stage2_offload(): - mlp = MLP(linear_size) - mlp_offload = MLP(linear_size) - mlp_offload.set_state_dict(mlp.state_dict()) - - mlp_params = train_mlp(mlp, offload=False) - mlp_offload_params = train_mlp(mlp_offload, offload=True) - - for i in range(len(mlp_params)): - np.testing.assert_allclose( - mlp_params[i].numpy(), - mlp_offload_params[i].numpy(), - rtol=5e-3, - atol=5e-3, - ) - return - - -if __name__ == '__main__': - fleet.init(is_collective=True, strategy=strategy) - test_sharding_stage2_offload() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3.py deleted file mode 100644 index 076cb810da3e0926e6600afe8fc5ea99623e8920..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3.py +++ /dev/null @@ -1,319 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import tempfile - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( - ShardingStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ( - ShardingStage3, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ( - ShardingScaler, -) -from paddle.nn import Linear - -epoch = 10 -paddle.seed(2021) -np.random.seed(2021) -base_lr = 0.1 -momentum_rate = 0.9 -l2_decay = 1e-4 - - -class MLP(fluid.Layer): - def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): - super().__init__() - - self._linear1 = Linear(linear_size, linear_size) - self._linear2 = Linear(linear_size, linear_size) - self._linear3 = Linear(linear_size, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - y = self._linear3(y) - return y - - -def reader_decorator(linear_size=1000): - def __reader__(): - for _ in range(100): - img = np.random.rand(linear_size).astype('float32') - label = np.ones(1).astype('int64') - yield img, label - - return __reader__ - - -def optimizer_setting(model, use_pure_fp16, opt_group=False): - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.Momentum( - parameters=[{"params": list(model.parameters())}] - if opt_group - else list(model.parameters()), - learning_rate=0.001, - weight_decay=0.00001, - grad_clip=clip, - multi_precision=use_pure_fp16, - ) - - return optimizer - - -def train_mlp( - model, - sharding_stage, - use_pure_fp16=False, - accumulate_grad=False, - batch_size=100, - opt_group=False, - sync_comm=False, - test_minimize=False, - save_model=False, -): - group = paddle.distributed.new_group([0, 1]) - if opt_group: - optimizer = optimizer_setting( - model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group - ) - else: - optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) - - if use_pure_fp16: - model = paddle.amp.decorate( - models=model, level='O2', save_dtype='float32' - ) - scaler = paddle.amp.GradScaler(init_loss_scaling=32768) - scaler = ShardingScaler(scaler) - if sharding_stage == 2: - optimizer = ShardingOptimizerStage2( - params=model.parameters(), optim=optimizer, group=group - ) - model = ShardingStage2( - model, optimizer, group=group, buffer_max_size=2**21 - ) - elif sharding_stage == 3: - model = ShardingStage3( - model, optimizer=optimizer, group=group, sync_comm=sync_comm - ) - - # check optimizer.minimize() error - if test_minimize: - try: - optimizer.minimize() - except: - print( - "====== Find sharding_stage3_optimizer.minimize() error ======" - ) - return - - train_reader = paddle.batch( - reader_decorator(), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - for eop in range(epoch): - model.train() - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - with paddle.amp.auto_cast(True, level='O2'): - out = model(img) - loss = paddle.nn.functional.cross_entropy( - input=out, label=label - ) - avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) - - if batch_size == 20: - avg_loss = avg_loss / 5 - - if not use_pure_fp16: - avg_loss.backward() - else: - scaler.scale(avg_loss).backward() - - if not accumulate_grad: - if not use_pure_fp16: - optimizer.step() - else: - scaler.step(optimizer) - scaler.update() - optimizer.clear_grad() - if accumulate_grad: - if not use_pure_fp16: - optimizer.step() - else: - scaler.step(optimizer) - scaler.update() - optimizer.clear_grad() - if sharding_stage == 3: - model.get_all_parameters() - - if save_model: - return model, optimizer - return model.parameters() - - -def test_stage2_stage3(): - mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = ( - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - ) - state_dict = mlp.state_dict() - mlp1.set_state_dict(state_dict) - mlp2.set_state_dict(state_dict) - mlp3.set_state_dict(state_dict) - mlp4.set_state_dict(state_dict) - mlp5.set_state_dict(state_dict) - mlp6.set_state_dict(state_dict) - mlp7.set_state_dict(state_dict) - mlp8.set_state_dict(state_dict) - mlp9.set_state_dict(state_dict) - mlp10.set_state_dict(state_dict) - - # fp32 - stage2_params = train_mlp( - mlp1, sharding_stage=2, use_pure_fp16=False, opt_group=False - ) - stage3_params = train_mlp( - mlp2, sharding_stage=3, use_pure_fp16=False, opt_group=False - ) - - for i in range(len(stage2_params)): - np.testing.assert_allclose( - stage2_params[i].numpy(), - stage3_params[i].numpy(), - rtol=1e-6, - atol=1e-6, - ) - - # fp32 accumulate grad - stage3_params = train_mlp( - mlp3, - sharding_stage=3, - use_pure_fp16=False, - accumulate_grad=True, - opt_group=True, - ) - stage3_params_add = train_mlp( - mlp4, - sharding_stage=3, - use_pure_fp16=False, - accumulate_grad=True, - batch_size=20, - opt_group=True, - ) - for i in range(len(stage3_params)): - np.testing.assert_allclose( - stage3_params[i].numpy(), - stage3_params_add[i].numpy(), - rtol=1e-6, - atol=1e-4, - ) - - # fp16 - stage2_params = train_mlp( - mlp5, sharding_stage=2, use_pure_fp16=True, opt_group=False - ) - stage3_params = train_mlp( - mlp6, sharding_stage=3, use_pure_fp16=True, opt_group=False - ) - for i in range(len(stage2_params)): - np.testing.assert_allclose( - stage2_params[i].numpy(), - stage3_params[i].numpy(), - rtol=1e-4, - atol=1e-3, - ) - - # fp16 sync_comm - stage3_params = train_mlp( - mlp7, sharding_stage=3, use_pure_fp16=True, opt_group=False - ) - stage3_params_re = train_mlp( - mlp8, - sharding_stage=3, - use_pure_fp16=True, - opt_group=False, - sync_comm=True, - ) - for i in range(len(stage3_params)): - np.testing.assert_allclose( - stage3_params[i].numpy(), stage3_params_re[i].numpy(), rtol=1e-6 - ) - - # save/load model - output_dir = tempfile.mkdtemp() - model_file = os.path.join(output_dir, "model.pdmodel") - optimizer_file = os.path.join(output_dir, "model.pdopt") - model_stage3, optimizer_stage3 = train_mlp( - mlp9, - sharding_stage=3, - use_pure_fp16=False, - opt_group=False, - save_model=True, - ) - paddle.save(model_stage3.state_dict(), model_file) - paddle.save(optimizer_stage3.state_dict(), optimizer_file) - m_state_dict = paddle.load(model_file) - opt_state_dict = paddle.load(optimizer_file) - model_stage3.set_state_dict(m_state_dict) - optimizer_stage3.set_state_dict(opt_state_dict) - shutil.rmtree(output_dir) - - # check optimizer.minimize() error - train_mlp( - mlp10, - sharding_stage=3, - use_pure_fp16=False, - opt_group=False, - test_minimize=True, - ) - - -if __name__ == '__main__': - fleet.init(is_collective=True) - test_stage2_stage3() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3_offload.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3_offload.py deleted file mode 100644 index 082160e4e3f04463da5b0514c4a0dcc666855792..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_sharding_stage3_offload.py +++ /dev/null @@ -1,219 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage3 import ( - ShardingStage3, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import ( - ShardingScaler, -) -from paddle.nn import Linear - -epoch = 10 -paddle.seed(2022) -np.random.seed(2022) -base_lr = 0.1 -momentum_rate = 0.9 -l2_decay = 1e-4 - - -class MLP(fluid.Layer): - def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): - super().__init__() - - self._linear1 = Linear(linear_size, linear_size) - self._linear2 = Linear(linear_size, linear_size) - self._linear3 = Linear(linear_size, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - y = self._linear3(y) - return y - - -def reader_decorator(linear_size=1000): - def __reader__(): - for _ in range(100): - img = np.random.rand(linear_size).astype('float32') - label = np.ones(1).astype('int64') - yield img, label - - return __reader__ - - -def optimizer_setting(model, use_pure_fp16, opt_group=False): - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.AdamW( - parameters=[{"params": model.parameters()}] - if opt_group - else model.parameters(), - learning_rate=0.001, - weight_decay=0.00001, - grad_clip=clip, - multi_precision=use_pure_fp16, - ) - - return optimizer - - -def train_mlp( - model, - use_pure_fp16=False, - accumulate_grad=False, - offload=False, - batch_size=100, - convert2cpu=False, -): - group = paddle.distributed.new_group([0, 1]) - optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) - - if use_pure_fp16: - model = paddle.amp.decorate( - models=model, level='O2', save_dtype='float32' - ) - scaler = paddle.amp.GradScaler(init_loss_scaling=32768) - scaler = ShardingScaler(scaler) - - model = ShardingStage3( - model, optimizer=optimizer, group=group, offload=offload - ) - - train_reader = paddle.batch( - reader_decorator(), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - for eop in range(epoch): - model.train() - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - with paddle.amp.auto_cast(True, level='O2'): - out = model(img) - loss = paddle.nn.functional.cross_entropy( - input=out, label=label - ) - avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) - - if accumulate_grad: - avg_loss = avg_loss / 5 - - if not use_pure_fp16: - avg_loss.backward() - else: - scaler.scale(avg_loss).backward() - - if not accumulate_grad: - if not use_pure_fp16: - optimizer.step() - else: - scaler.step(optimizer) - scaler.update() - optimizer.clear_grad() - if accumulate_grad: - if not use_pure_fp16: - optimizer.step() - else: - scaler.step(optimizer) - scaler.update() - optimizer.clear_grad() - if not convert2cpu: - model.get_all_parameters() - else: - model.get_all_parameters(convert2cpu) - return model.parameters() - - -def test_stage3_offload(): - mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6 = ( - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - MLP(), - ) - state_dict = mlp.state_dict() - mlp1.set_state_dict(state_dict) - mlp2.set_state_dict(state_dict) - mlp3.set_state_dict(state_dict) - mlp4.set_state_dict(state_dict) - mlp5.set_state_dict(state_dict) - mlp6.set_state_dict(state_dict) - - # fp32 offload - stage3_params = train_mlp(mlp1, use_pure_fp16=False) - stage3_params_offload = train_mlp(mlp2, use_pure_fp16=False, offload=True) - for i in range(len(stage3_params)): - np.testing.assert_allclose( - stage3_params[i].numpy(), - stage3_params_offload[i].numpy(), - rtol=1e-6, - atol=1e-8, - ) - - # fp16 offload - stage3_params = train_mlp(mlp3, use_pure_fp16=True) - stage3_params_offload = train_mlp(mlp4, use_pure_fp16=True, offload=True) - for i in range(len(stage3_params)): - np.testing.assert_allclose( - stage3_params[i].numpy(), - stage3_params_offload[i].numpy(), - rtol=1e-2, - atol=1e-2, - ) - - # fp32 accumulate grad offload - stage3_params = train_mlp( - mlp5, use_pure_fp16=False, batch_size=20, accumulate_grad=True - ) - stage3_params_offload = train_mlp( - mlp6, - use_pure_fp16=False, - accumulate_grad=True, - offload=True, - batch_size=20, - convert2cpu=True, - ) - for i in range(len(stage3_params)): - np.testing.assert_allclose( - stage3_params[i].numpy(), - stage3_params_offload[i].numpy(), - rtol=1e-6, - atol=1e-8, - ) - return - - -if __name__ == '__main__': - fleet.init(is_collective=True) - test_stage3_offload() diff --git a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py deleted file mode 100644 index 651b0adf9e28429c48db367c27ee619b62b2d2fd..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py +++ /dev/null @@ -1,251 +0,0 @@ -# -*- coding: UTF-8 -*- - -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import tempfile - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.distributed import fleet -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ( - ShardingOptimizerStage2, -) -from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( - ShardingStage2, -) -from paddle.nn import Linear - -seed = 2022 -epoch = 2 -linear_size = 1000 - -strategy = fleet.DistributedStrategy() -strategy.hybrid_configs = { - "dp_degree": 16, - "mp_degree": 1, - "pp_degree": 1, - "sharding_degree": 1, -} - -np.random.seed(seed) -paddle.seed(seed) - - -class MLP(fluid.Layer): - def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): - super().__init__() - - self._linear1 = Linear(linear_size, linear_size) - self._linear2 = Linear(linear_size, linear_size) - self._linear3 = Linear(linear_size, linear_size) - self._linear4 = Linear(linear_size, linear_size) - self._linear5 = Linear(linear_size, 10) - - def forward(self, inputs): - y = self._linear1(inputs) - y = self._linear2(y) - y = self._linear3(y) - y = self._linear4(y) - y = self._linear5(y) - return y - - -def reader_decorator(linear_size=1000): - def __reader__(): - for _ in range(100): - img = np.random.rand(linear_size).astype('float32') - label = np.ones(1).astype('int64') - yield img, label - - return __reader__ - - -def optimizer_setting(model, use_pure_fp16, opt_group=False): - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.AdamW( - parameters=[{"params": model.parameters()}] - if opt_group - else model.parameters(), - learning_rate=0.001, - weight_decay=0.00001, - grad_clip=clip, - multi_precision=use_pure_fp16, - ) - - return optimizer - - -def train_mlp( - model, - sharding_stage, - batch_size=100, - use_pure_fp16=False, - accumulate_grad=False, - opt_group=False, - save_model=False, -): - if sharding_stage == "dp": - hcg = fleet.get_hybrid_communicate_group() - group = hcg.get_check_parallel_group() - else: - group = paddle.distributed.new_group( - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] - ) - if opt_group: - optimizer = optimizer_setting( - model=model, use_pure_fp16=use_pure_fp16, opt_group=opt_group - ) - else: - optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) - - if sharding_stage == 2: - optimizer = ShardingOptimizerStage2( - params=model.parameters(), optim=optimizer, group=group - ) - - model = ShardingStage2( - model, optimizer, group=group, buffer_max_size=2**21 - ) - else: - optimizer = fleet.distributed_optimizer(optimizer) - model = fleet.distributed_model(model) - - train_reader = paddle.batch( - reader_decorator(), batch_size=batch_size, drop_last=True - ) - - train_loader = paddle.io.DataLoader.from_generator( - capacity=32, - use_double_buffer=True, - iterable=True, - return_list=True, - use_multiprocess=True, - ) - train_loader.set_sample_list_generator(train_reader) - - if sharding_stage == 2: - model.to(device="gpu") - - for eop in range(epoch): - model.train() - - for batch_id, data in enumerate(train_loader()): - img, label = data - label.stop_gradient = True - img.stop_gradient = True - - out = model(img) - loss = paddle.nn.functional.cross_entropy(input=out, label=label) - - avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) - if batch_size == 20: - avg_loss = avg_loss / 5 - avg_loss.backward() - - if not accumulate_grad: - optimizer.step() - optimizer.clear_grad() - - if accumulate_grad: - optimizer.step() - optimizer.clear_grad() - - if save_model: - return model, optimizer - return model.parameters() - - -def test_dp_stage2(): - mlp = MLP() - state_dict = mlp.state_dict() - mlp1 = MLP() - mlp2 = MLP() - mlp3 = MLP() - mlp4 = MLP() - mlp5 = MLP() - mlp6 = MLP() - mlp1.set_state_dict(state_dict) - mlp2.set_state_dict(state_dict) - mlp3.set_state_dict(state_dict) - mlp4.set_state_dict(state_dict) - mlp5.set_state_dict(state_dict) - mlp6.set_state_dict(state_dict) - - # DP VS stage2 - dp_params = train_mlp( - mlp1, sharding_stage="dp", use_pure_fp16=False, opt_group=False - ) - stage2_params = train_mlp( - mlp2, sharding_stage=2, use_pure_fp16=False, opt_group=False - ) - for i in range(len(dp_params)): - np.testing.assert_allclose( - dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6, atol=5e-4 - ) - - # stage2 accumulate grad - stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True) - stage2_accumulate_grad = train_mlp( - mlp4, sharding_stage=2, batch_size=20, accumulate_grad=True - ) - for i in range(len(stage2_params)): - np.testing.assert_allclose( - stage2_params[i].numpy(), - stage2_accumulate_grad[i].numpy(), - rtol=1e-5, - atol=1e-5, - ) - - # stage2 param list VS param group - stage2_params = train_mlp( - mlp5, sharding_stage=2, use_pure_fp16=False, opt_group=True - ) - for i in range(len(dp_params)): - np.testing.assert_allclose( - dp_params[i].numpy(), stage2_params[i].numpy(), rtol=1e-6, atol=5e-4 - ) - - # save/load model - output_dir = tempfile.mkdtemp() - try: - model_file = os.path.join(output_dir, "model.pdmodel") - optimizer_file = os.path.join(output_dir, "model.pdopt") - model_stage2, optimizer_stage2 = train_mlp( - mlp6, - sharding_stage=2, - use_pure_fp16=False, - opt_group=False, - save_model=True, - ) - paddle.save(model_stage2.state_dict(), model_file) - paddle.save(optimizer_stage2.state_dict(), optimizer_file) - m_state_dict = paddle.load(model_file) - opt_state_dict = paddle.load(optimizer_file) - model_stage2.set_state_dict(m_state_dict) - optimizer_stage2.set_state_dict(opt_state_dict) - except Exception as e: - shutil.rmtree(output_dir) - raise e - else: - shutil.rmtree(output_dir) - - -if __name__ == '__main__': - fleet.init(is_collective=True, strategy=strategy) - test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/collective/multinode/test_multinode_dygraph_sharding.py b/python/paddle/fluid/tests/unittests/collective/multinode/test_multinode_dygraph_sharding.py index c46df531babef6b6e0f1cba80dbceeb1d32904df..9ea30328eccb90c336aff12aab2dd74c95bee941 100755 --- a/python/paddle/fluid/tests/unittests/collective/multinode/test_multinode_dygraph_sharding.py +++ b/python/paddle/fluid/tests/unittests/collective/multinode/test_multinode_dygraph_sharding.py @@ -25,13 +25,6 @@ class TestDYgrapShardingDP(TestDistBase): self._trainers = 16 self._init_env() - def test_hybrid_sharding_stage2(self): - self.check_with_place( - "mn_dygraph_sharding_stage2.py", - backend="nccl", - need_envs=os.environ, - ) - def test_hybrid_sharding_stage3(self): self.check_with_place( "mn_dygraph_group_sharded_stage3.py",