From d0a89744f5918acaefaab6a960915780b2265dee Mon Sep 17 00:00:00 2001 From: Baibaifan <39549453+Baibaifan@users.noreply.github.com> Date: Mon, 29 Nov 2021 11:22:17 +0800 Subject: [PATCH] fix_InternalStorage (#37568) --- .../dygraph_optimizer/__init__.py | 2 +- .../dygraph_sharding_optimizer.py | 266 +--------------- .../sharding_optimizer_stage2.py | 285 ++++++++++++++++++ .../fleet/meta_parallel/sharding/__init__.py | 2 +- .../meta_parallel/sharding/sharding_utils.py | 39 --- .../fleet/utils/internal_storage.py | 26 +- .../dygraph_sharding_optimizer_stage2.py | 4 +- python/setup.py.in | 1 + 8 files changed, 302 insertions(+), 323 deletions(-) create mode 100644 python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/__init__.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/__init__.py index f7b346e5228..28260d7aa18 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/__init__.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/__init__.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and from .hybrid_parallel_optimizer import HybridParallelOptimizer from .hybrid_parallel_gradscaler import HybridParallelGradScaler -# from .dygraph_sharding_optimizer import DygraphShardingOptimizer +from .dygraph_sharding_optimizer import DygraphShardingOptimizer __all__ = [] diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py index 9512c43425b..b7edf583002 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,32 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -#Taken and modified for fairscale from: -# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py -#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e -import numpy as np -from itertools import chain +###### + from functools import reduce -from collections import OrderedDict import paddle from paddle import framework -import paddle.distributed as dist -from paddle.optimizer import Optimizer - from ...utils.log_util import logger -from ...utils.internal_storage import ParamStorage -from ...meta_parallel.sharding.sharding_utils import Type - -# CUDA alignment 256 bytes -alignment = {"gpu": 256, } -align = { - Type.fp16.value: 2, - Type.fp32.value: 4, -} - -__all__ = ["ShardingOptimizerStage2"] def _is_trainable(param): @@ -210,245 +192,3 @@ class DygraphShardingOptimizer(object): def __getattr__(self, item): return getattr(self._inner_optimizer, item) - - -class ShardingOptimizerStage2(Optimizer): - """ - A wrapper for Sharding Stage2 Optimizer in Dygraph. - - .. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer. - - .. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf. - - """ - - # TODO (Baibaifan) - # Feature Notes: - # 1. Unified memory for parameters and parameters.grad to InternalStorage. - # 2. Support the segmentation of optimizer parameters and partial updating of parameters. - # 3. Dynamically adjust training parameters and models。 - # 4. Support offload function. - # 5. Support the establishment of independent communication groups. - # 6. Broadcast_fp16 is not supported now. - def __init__(self, - params, - optim, - group, - broadcast_fp16=False, - offload=False, - device="gpu", - accumulation_steps=None, - **kw): - - super().__init__(optim._learning_rate, params, kw) - - # Segmentation information - self._dtype_rank_params = OrderedDict( - ) # {dtype:[param1,param2]} device, rank, params - self._param2rank = {} - self._segment_params = [] - self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}} - self._param2align = {} # {param.name: align} - - # Default information - self._optim_defaults = kw - self._optim = optim - self._local_params = params - self._default_device = device - self._accumulation_steps = accumulation_steps - - assert group is not None, "Distributed communication group is must be gived" - self.group = group - self.world_size = group.nranks - self.rank = group.rank - - self.broadcast_fp16 = broadcast_fp16 - self.param_storages = {} # {dtype: {rank: InternalStorage}} - self.offload = offload # Using for offload - - # Update optimizer parameters and adjust parameter storage and use according to rank. - self.update_opt_status() - - def update_opt_status(self): - """Update optimizer status and parameter storage information, and special functions to be developed. - """ - # func 1 - self._integration_params() - - # fun 2 TODO - - # Segement helpers - - def segment_params(self): - """ - Divide all optimizer parameters equally into rank. - """ - if len(self._segment_params) == 0: - self._segment_params, param_lists = [ - [] for _ in range(self.world_size) - ], [[] for _ in range(self.world_size)] - sizes = [0] * self.world_size - for param in self._local_params: - # Add this param to rank with smallest size. - rank = sizes.index(min(sizes)) - param_lists[rank].append(param) - - # Statistical real numels - sizes[rank] += np.prod(param.shape) if param.trainable else 0 - - for rank, params in enumerate(param_lists): - # param_group_rank = copy.copy(params) - self._segment_params[rank].extend(params) - return self._segment_params - - @property - def local_params(self): - return self._local_params - - @property - def accumulation_steps(self): - return self._accumulation_steps - - @property - def param2rank(self): - """Map the params to the rank which owns them""" - if len(self._param2rank) == 0: - for rank, params in enumerate(self.segment_params()): - for param in params: - self._param2rank[param.name] = rank - return self._param2rank - - @property - def dtype_rank_params(self): - """ - Divide the parameters into groups according to rank and dtype. - """ - if len(self._dtype_rank_params) == 0: - # Assign the parameters of each rank according to the type - for param in self._local_params: - if param.dtype not in self._dtype_rank_params.keys(): - self._dtype_rank_params[ - param.dtype] = [[] for _ in range(self.world_size)] - self._dtype_rank_params[param.dtype][self.param2rank[ - param.name]].append(param) - - # Sort per rank params by size - for dtype in self._dtype_rank_params.keys(): - for rank_params in self._dtype_rank_params[dtype]: - rank_params.sort(key=lambda x: np.prod(x.shape)) - - return self._dtype_rank_params - - @property - def rank_buffer_size(self): - """ - Count the memory size of the parameters corresponding to rank under the corresponding dtype. - """ - # CUDA alignment 256 bytes - if len(self._rank_buffer_size) == 0: - for dtype in self.dtype_rank_params.keys(): - if dtype not in self._rank_buffer_size.keys(): - self._rank_buffer_size[dtype] = {} - for dst_rank, per_rank_params in enumerate( - self.dtype_rank_params[dtype]): - if dst_rank not in self._rank_buffer_size[dtype].keys(): - self._rank_buffer_size[dtype][dst_rank] = 0 - for param in per_rank_params: - if not param.trainable: - continue - size = np.prod(param.shape) * align[dtype] - remaining = size % alignment[self._default_device] - ali = 0 if remaining == 0 else alignment[ - self._default_device] - remaining - align_ = ali // align[dtype] - self._rank_buffer_size[dtype][dst_rank] += np.prod( - param.shape) + align_ - self._param2align[param.name] = align_ - - return self._rank_buffer_size - - def _integration_params(self): - """ - Integrate the parameters into a continuous memory according to rank, and support the update of training parameters. - """ - - for dtype, per_rank_params in self.dtype_rank_params.items(): - if dtype not in self.param_storages.keys(): - self.param_storages[dtype] = {} - - for dst_rank, params in enumerate(per_rank_params): - if len(params) > 0: - - # Merge all the trainable params in a single InternalStorage - trainable_params = list( - filter(lambda x: x.trainable, params)) - if trainable_params: - param_storage = ParamStorage( - size=self.rank_buffer_size[dtype][dst_rank], - dtype=dtype, - device=self._default_device) - - param_storage.add_rank_params(trainable_params, - self._param2align) - self.param_storages[dtype][dst_rank] = param_storage - - # Clear the InternalStorage keys which are not in use anymore - dtype_in_use = list(self.dtype_rank_params.keys()) - dtype_to_pop = list( - filter(lambda x: x not in dtype_in_use, self.param_storages.keys())) - for d in dtype_to_pop: - self.param_storages.pop(d) - - def step(self): - """ - A wrapper for Optimizer's step function to finish the update operation of the optimizer. - """ - - # Synchronize optimizer parameters for the current rank - if len(self.dtype_rank_params.keys( - )) == 1 and Type.fp32.value in self.dtype_rank_params.keys(): - self._optim._parameter_list = self.dtype_rank_params[ - Type.fp32.value][self.rank] - elif len(self.dtype_rank_params.keys( - )) == 1 and Type.fp16.value in self.dtype_rank_params.keys(): - self._optim._parameter_list = self.dtype_rank_params[ - Type.fp16.value][self.rank] - else: - self._optim._parameter_list = self.dtype_rank_params[ - Type.fp16.value][self.rank] + self.dtype_rank_params[ - Type.fp32.value][self.rank] - - # Run the optimizer of the current rank step - self._optim.step() - - # Synchronize all the updated shards in between the ranks - self._broadcast_params() - - # Return full parameters to optimizer parameters - self._optim._parameter_list = self._local_params - - def clear_cache(self): - self._segment_params.clear() - self._dtype_rank_params.clear() - self._param2rank.clear() - - @paddle.no_grad() - def _broadcast_params(self): - """Broadcast the parameters of the current rank to each rank""" - - assert self._default_device == "gpu", "Only supported gpu" - - # Exchange all the shards with the other ranks - for dtype_per_rank in self.param_storages.values(): - for dst_rank, internal_storage in dtype_per_rank.items(): - dist.broadcast( - tensor=internal_storage.buffer, - src=dst_rank, - group=self.group, - use_calc_stream=True) - - # Multi stream operation will be supported later - dist.wait( - tensor=internal_storage.buffer, - group=self.group, - use_calc_stream=True) diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py new file mode 100644 index 00000000000..9595896188b --- /dev/null +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py @@ -0,0 +1,285 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#Taken and modified for fairscale from: +# https://github.com/facebookresearch/fairscale/blob/main/fairscale/optim/oss.py +#Commit: 8acbec718f3c70a6b9785470bb9e05cd84fc3f8e + +import copy +import time +import logging +import numpy as np +from math import inf +from itertools import chain +from functools import reduce +from collections import OrderedDict + +import paddle +import paddle.fluid as fluid +from paddle import framework +import paddle.distributed as dist +from paddle.optimizer import Optimizer + +from ...utils.internal_storage import ParamStorage +from ...meta_parallel.sharding.sharding_utils import Type + +# CUDA alignment 256 bytes +alignment = {"gpu": 256, } +align = { + Type.fp16.value: 2, + Type.fp32.value: 4, +} + +__all__ = ["ShardingOptimizerStage2"] + + +class ShardingOptimizerStage2(Optimizer): + """ + A wrapper for Sharding Stage2 Optimizer in Dygraph. + + .. warning: ShardingOptimizer encapsulates the optimization strategy and integrates it into the optimizer. + + .. ZeRO: 1.https://arxiv.org/pdf/1910.02054.pdf 2.https://arxiv.org/pdf/1910.02054.pdf. + + """ + + # TODO (Baibaifan) + # Feature Notes: + # 1. Unified memory for parameters and parameters.grad to InternalStorage. + # 2. Support the segmentation of optimizer parameters and partial updating of parameters. + # 3. Dynamically adjust training parameters and models。 + # 4. Support offload function. + # 5. Support the establishment of independent communication groups. + # 6. Broadcast_fp16 is not supported now. + def __init__(self, + params, + optim, + group, + broadcast_fp16=False, + offload=False, + device="gpu", + accumulation_steps=None, + **kw): + + super().__init__(optim._learning_rate, params, kw) + + # Segmentation information + self._dtype_rank_params = OrderedDict( + ) # {dtype:[param1,param2]} device, rank, params + self._param2rank = {} + self._segment_params = [] + self._rank_buffer_size = {} # {dtype: {rank: numel+alignment}} + self._param2align = {} # {param.name: align} + + # Default information + self._optim_defaults = kw + self._optim = optim + self._local_params = params + self._default_device = device + self._accumulation_steps = accumulation_steps + + assert group is not None, "Distributed communication group is must be gived" + self.group = group + self.world_size = group.nranks + self.rank = group.rank + + self.broadcast_fp16 = broadcast_fp16 + self.param_storages = {} # {dtype: {rank: InternalStorage}} + self.offload = offload # Using for offload + + # Update optimizer parameters and adjust parameter storage and use according to rank. + self.update_opt_status() + + def update_opt_status(self): + """Update optimizer status and parameter storage information, and special functions to be developed. + """ + # func 1 + self._integration_params() + + # fun 2 TODO + + # Segement helpers + + def segment_params(self): + """ + Divide all optimizer parameters equally into rank. + """ + if len(self._segment_params) == 0: + self._segment_params, param_lists = [ + [] for _ in range(self.world_size) + ], [[] for _ in range(self.world_size)] + sizes = [0] * self.world_size + for param in self._local_params: + # Add this param to rank with smallest size. + rank = sizes.index(min(sizes)) + param_lists[rank].append(param) + + # Statistical real numels + sizes[rank] += np.prod(param.shape) if param.trainable else 0 + + for rank, params in enumerate(param_lists): + # param_group_rank = copy.copy(params) + self._segment_params[rank].extend(params) + return self._segment_params + + @property + def local_params(self): + return self._local_params + + @property + def accumulation_steps(self): + return self._accumulation_steps + + @property + def param2rank(self): + """Map the params to the rank which owns them""" + if len(self._param2rank) == 0: + for rank, params in enumerate(self.segment_params()): + for param in params: + self._param2rank[param.name] = rank + return self._param2rank + + @property + def dtype_rank_params(self): + """ + Divide the parameters into groups according to rank and dtype. + """ + if len(self._dtype_rank_params) == 0: + # Assign the parameters of each rank according to the type + for param in self._local_params: + if param.dtype not in self._dtype_rank_params.keys(): + self._dtype_rank_params[ + param.dtype] = [[] for _ in range(self.world_size)] + self._dtype_rank_params[param.dtype][self.param2rank[ + param.name]].append(param) + + # Sort per rank params by size + for dtype in self._dtype_rank_params.keys(): + for rank_params in self._dtype_rank_params[dtype]: + rank_params.sort(key=lambda x: np.prod(x.shape)) + + return self._dtype_rank_params + + @property + def rank_buffer_size(self): + """ + Count the memory size of the parameters corresponding to rank under the corresponding dtype. + """ + # CUDA alignment 256 bytes + if len(self._rank_buffer_size) == 0: + for dtype in self.dtype_rank_params.keys(): + if dtype not in self._rank_buffer_size.keys(): + self._rank_buffer_size[dtype] = {} + for dst_rank, per_rank_params in enumerate( + self.dtype_rank_params[dtype]): + if dst_rank not in self._rank_buffer_size[dtype].keys(): + self._rank_buffer_size[dtype][dst_rank] = 0 + for param in per_rank_params: + if not param.trainable: + continue + size = np.prod(param.shape) * align[dtype] + remaining = size % alignment[self._default_device] + ali = 0 if remaining == 0 else alignment[ + self._default_device] - remaining + align_ = ali // align[dtype] + self._rank_buffer_size[dtype][dst_rank] += np.prod( + param.shape) + align_ + self._param2align[param.name] = align_ + + return self._rank_buffer_size + + def _integration_params(self): + """ + Integrate the parameters into a continuous memory according to rank, and support the update of training parameters. + """ + + for dtype, per_rank_params in self.dtype_rank_params.items(): + if dtype not in self.param_storages.keys(): + self.param_storages[dtype] = {} + + for dst_rank, params in enumerate(per_rank_params): + if len(params) > 0: + + # Merge all the trainable params in a single InternalStorage + trainable_params = list( + filter(lambda x: x.trainable, params)) + if trainable_params: + param_storage = ParamStorage( + size=self.rank_buffer_size[dtype][dst_rank], + dtype=dtype, + device=self._default_device) + + param_storage.add_rank_params(trainable_params, + self._param2align) + self.param_storages[dtype][dst_rank] = param_storage + + # Clear the InternalStorage keys which are not in use anymore + dtype_in_use = list(self.dtype_rank_params.keys()) + dtype_to_pop = list( + filter(lambda x: x not in dtype_in_use, self.param_storages.keys())) + for d in dtype_to_pop: + self.param_storages.pop(d) + + def step(self): + """ + A wrapper for Optimizer's step function to finish the update operation of the optimizer. + """ + + # Synchronize optimizer parameters for the current rank + if len(self.dtype_rank_params.keys( + )) == 1 and Type.fp32.value in self.dtype_rank_params.keys(): + self._optim._parameter_list = self.dtype_rank_params[ + Type.fp32.value][self.rank] + elif len(self.dtype_rank_params.keys( + )) == 1 and Type.fp16.value in self.dtype_rank_params.keys(): + self._optim._parameter_list = self.dtype_rank_params[ + Type.fp16.value][self.rank] + else: + self._optim._parameter_list = self.dtype_rank_params[ + Type.fp16.value][self.rank] + self.dtype_rank_params[ + Type.fp32.value][self.rank] + + # Run the optimizer of the current rank step + self._optim.step() + + # Synchronize all the updated shards in between the ranks + self._broadcast_params() + + # Return full parameters to optimizer parameters + self._optim._parameter_list = self._local_params + + def clear_cache(self): + self._segment_params.clear() + self._dtype_rank_params.clear() + self._param2rank.clear() + + @fluid.dygraph.no_grad + def _broadcast_params(self): + """Broadcast the parameters of the current rank to each rank""" + + assert self._default_device == "gpu", "Only supported gpu" + + # Exchange all the shards with the other ranks + for dtype_per_rank in self.param_storages.values(): + for dst_rank, internal_storage in dtype_per_rank.items(): + dist.broadcast( + tensor=internal_storage.buffer, + src=dst_rank, + group=self.group, + use_calc_stream=True) + + # Multi stream operation will be supported later + dist.wait( + tensor=internal_storage.buffer, + group=self.group, + use_calc_stream=True) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py b/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py index 845879ba38f..1d6bd8e1ee4 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .sharding_utils import GpuInfo +from .sharding_utils import Type, device_guard diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py index 4cf40005c1f..d4c443e385f 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_utils.py @@ -22,15 +22,6 @@ import paddle import paddle.distributed as dist from paddle.fluid import core -# Set global device id -global dev_id -if core.is_compiled_with_cuda(): - dev_id = int(os.environ.get('FLAGS_selected_gpus', 0)) -elif core.is_compiled_with_npu(): - dev_id = int(os.environ.get('FLAGS_selected_npus', 0)) -else: - raise ValueError("This device doesn't support.") - class Taskflow: """ @@ -50,36 +41,6 @@ class Type(Enum): fp32 = paddle.float32 -def GpuInfo(fn): - """ - Displays GPU usage information before and after the function。 - """ - - def used(*args, **kw): - # Before using - b_info = os.popen("nvidia-smi -i {} | grep MiB".format(str( - dev_id))).read() - before_info = (int(b_info.split()[8][:-3]), - int(b_info.split()[10][:-3])) - print( - "====== Current device {} ====== Total has {} MiB, Has used {} MiB ======". - format(str(dev_id), str(before_info[1]), str(before_info[0]))) - result = fn(*args, **kw) - # After using - a_info = os.popen("nvidia-smi -i {} | grep MiB".format(str( - dev_id))).read() - after_info = (int(a_info.split()[8][:-3]), int(a_info.split()[10][:-3])) - print( - "====== Current device {} ====== Total has {} MiB, Has used {} MiB, Self use {} MiB ======". - format( - str(dev_id), - str(after_info[1]), - str(after_info[0]), str(after_info[0] - before_info[0]))) - return result - - return used - - @contextlib.contextmanager def device_guard(dev_id, device="cpu"): origin_device = paddle.device.get_device() diff --git a/python/paddle/distributed/fleet/utils/internal_storage.py b/python/paddle/distributed/fleet/utils/internal_storage.py index 96947221f31..ff41ca217e4 100644 --- a/python/paddle/distributed/fleet/utils/internal_storage.py +++ b/python/paddle/distributed/fleet/utils/internal_storage.py @@ -20,18 +20,10 @@ import time import numpy as np import paddle +import paddle.fluid as fluid from paddle.fluid import core from ..meta_parallel.sharding.sharding_utils import Type, device_guard -# Set global device id -global dev_id -if core.is_compiled_with_cuda(): - dev_id = int(os.environ.get('FLAGS_selected_gpus', 0)) -elif core.is_compiled_with_npu(): - dev_id = int(os.environ.get('FLAGS_selected_npus', 0)) -else: - raise ValueError("This device doesn't support.") - class InternalStorage: """ @@ -68,7 +60,7 @@ class ParamStorage(InternalStorage): super().__init__(size, dtype, device, convert_cpu=True) self.param2align = None - @paddle.no_grad() + @fluid.dygraph.no_grad def add_rank_params(self, trainable_params, param2align): """ Add new parameters to the InternalStorage. Params becomes a view of this InternalStorage buffer. @@ -87,6 +79,7 @@ class ParamStorage(InternalStorage): cpu_param_shape.append(p_shape) # buffer covert from cpu to cuda + dev_id = int(paddle.get_device().split(":")[1]) self.buffer = self.buffer.cuda(dev_id) self._fill = 0 @@ -96,7 +89,7 @@ class ParamStorage(InternalStorage): self._params.append(param) self._param_ids.append(id(param)) - @paddle.no_grad() + @fluid.dygraph.no_grad def _add_param_as_view(self, param, align): assert ( @@ -116,6 +109,7 @@ class ParamStorage(InternalStorage): param.stop_gradient = origin_state # Copy the current param value + dev_id = int(paddle.get_device().split(":")[1]) with device_guard(dev_id, "cpu"): tmp_var = core.VarBase(tensor=self.buffer._slice(self._fill, var_end)) @@ -126,7 +120,7 @@ class ParamStorage(InternalStorage): self._fill = offset return p_shape - @paddle.no_grad() + @fluid.dygraph.no_grad def _convert_buffer(self, param, p_shape, align): var_end = self._fill + np.prod(p_shape) @@ -177,7 +171,7 @@ class GradStorage(InternalStorage): param.shape) + align <= self._max_size and id( param) not in self._param_ids - @paddle.no_grad() + @fluid.dygraph.no_grad def add_grad(self, param, align): """ Add a new parameter gradient to the InternalStorage. Param.grad becomes a view of this InternalStorage buffer. @@ -191,7 +185,7 @@ class GradStorage(InternalStorage): self._params.append(param) self._param_ids.append(id(param)) - @paddle.no_grad() + @fluid.dygraph.no_grad def manumal_relase(self): """ Release the buffer from InternalStorage. The InternalStorage will need to be rebuilt before use. @@ -207,7 +201,7 @@ class GradStorage(InternalStorage): self.params_checked_in = 0 self._release = True - @paddle.no_grad() + @fluid.dygraph.no_grad def rebuild(self): """ Given the parameter gradients which have been registered previously, rebuild the whole InternalStorage. @@ -223,7 +217,7 @@ class GradStorage(InternalStorage): self._release = False - @paddle.no_grad() + @fluid.dygraph.no_grad def _add_grad_as_view(self, param, align): assert np.prod( self.buffer.shape diff --git a/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py b/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py index 7a5ec28dd1a..571e41b2c4f 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py +++ b/python/paddle/fluid/tests/unittests/dygraph_sharding_optimizer_stage2.py @@ -24,9 +24,8 @@ import paddle.fluid as fluid from paddle.fluid.dygraph.nn import Linear from paddle.distributed import fleet -from paddle.distributed.fleet.meta_parallel.sharding.sharding_utils import GpuInfo from paddle.distributed.fleet.utils.internal_storage import GradStorage -from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.dygraph_sharding_optimizer import ShardingOptimizerStage2 +from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 base_lr = 0.1 momentum_rate = 0.9 @@ -69,7 +68,6 @@ def optimizer_setting(parameter_list=None): return optimizer -@GpuInfo def train_mlp(): fleet.init(is_collective=True) group = paddle.distributed.new_group([0, 1]) diff --git a/python/setup.py.in b/python/setup.py.in index 60d9434e856..e01019ed7da 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -291,6 +291,7 @@ packages=['paddle', 'paddle.distributed.fleet.utils', 'paddle.distributed.fleet.meta_parallel', 'paddle.distributed.fleet.meta_parallel.pp_utils', + 'paddle.distributed.fleet.meta_parallel.sharding', 'paddle.distributed.fleet.meta_parallel.parallel_layers', 'paddle.distributed.auto_parallel', 'paddle.distributed.auto_parallel.operators', -- GitLab