diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 8ac5b93ef672bb4b964de46b18679cc2465b178c..11d7643c676dd40e7e10480ff1be90dd728a994d 100755 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -17,7 +17,6 @@ from .base.role_maker import Role # noqa: F401 from .base.role_maker import UserDefinedRoleMaker # noqa: F401 from .base.role_maker import PaddleCloudRoleMaker # noqa: F401 from .base.distributed_strategy import DistributedStrategy # noqa: F401 -from .base.fleet_base import Fleet # noqa: F401 from .base.util_factory import UtilBase # noqa: F401 from .dataset import DatasetBase # noqa: F401 from .dataset import InMemoryDataset # noqa: F401 @@ -29,6 +28,10 @@ from .data_generator.data_generator import MultiSlotStringDataGenerator # noqa: from . import metrics # noqa: F401 from .base.topology import CommunicateTopology from .base.topology import HybridCommunicateGroup # noqa: F401 +from .fleet import Fleet +from .model import distributed_model +from .optimizer import distributed_optimizer +from .scaler import distributed_scaler __all__ = [ #noqa "CommunicateTopology", "UtilBase", "HybridCommunicateGroup", @@ -72,7 +75,7 @@ init_worker = fleet.init_worker init_server = fleet.init_server run_server = fleet.run_server stop_worker = fleet.stop_worker -distributed_optimizer = fleet.distributed_optimizer +distributed_optimizer = distributed_optimizer save_inference_model = fleet.save_inference_model save_persistables = fleet.save_persistables save_cache_model = fleet.save_cache_model @@ -83,13 +86,7 @@ load_model = fleet.load_model load_inference_model = fleet.load_inference_model load_one_table = fleet.load_one_table minimize = fleet.minimize -distributed_model = fleet.distributed_model -step = fleet.step -clear_grad = fleet.clear_grad -set_lr = fleet.set_lr -get_lr = fleet.get_lr -state_dict = fleet.state_dict -set_state_dict = fleet.set_state_dict +distributed_model = distributed_model shrink = fleet.shrink get_hybrid_communicate_group = fleet.get_hybrid_communicate_group -distributed_scaler = fleet.distributed_scaler +distributed_scaler = distributed_scaler diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/fleet.py old mode 100755 new mode 100644 similarity index 72% rename from python/paddle/distributed/fleet/base/fleet_base.py rename to python/paddle/distributed/fleet/fleet.py index 52f3812d8a5f57a4240b53d3207b36cf1fd23c34..060d5defd4973502b56df736e7cd0bd646688796 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,73 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function import copy import warnings import paddle import os from types import MethodType import numpy as np -from paddle.fluid.framework import dygraph_only, _global_flags +from paddle.fluid.framework import _global_flags from paddle.fluid import compiler -from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase -from .strategy_compiler import StrategyCompiler -from .distributed_strategy import DistributedStrategy -from .meta_optimizer_factory import MetaOptimizerFactory -from .runtime_factory import RuntimeFactory +from .base.role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase +from .base.strategy_compiler import StrategyCompiler +from .base.distributed_strategy import DistributedStrategy +from .base.meta_optimizer_factory import MetaOptimizerFactory +from .base.runtime_factory import RuntimeFactory from paddle.fluid.wrapped_decorator import wrap_decorator from paddle.fluid.dygraph import parallel_helper from paddle.fluid.ir import apply_build_strategy -from . import topology as tp -from .topology import ParallelMode -from ..meta_parallel import TensorParallel, model_parallel_random_seed -from ..meta_parallel import PipelineParallel, ShardingParallel -from ..meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer +from .base import topology as tp +from .meta_parallel import model_parallel_random_seed from paddle import _C_ops from paddle.fluid import core -from paddle.fluid.dygraph import to_variable -from paddle.distributed.fleet.utils.recompute import LegacyRecomputeFunction -from paddle.fluid.dygraph.varbase_patch_methods import _grad_scalar __all__ = [] -_grad_scalar = None - - -class _RecomputeModelWrapper(paddle.nn.Layer): - - def __init__(self, model, segments=2, preserve_rng_state=True): - super(_RecomputeModelWrapper, self).__init__() - assert isinstance(model, paddle.nn.Sequential), ( - "The model passed to RecomputeModelWrapper must be of type " - "paddle.nn.Sequential.") - self._model = model - self._segments = segments - self._preserve_rng_state = preserve_rng_state - self._layers = list(model.children()) - self._segment_size = len(self._layers) // segments - - def _run_func(self, begin, end): - - def do_run(input): - for i in range(begin, end): - input = self._layers[i](input) - return input - - return do_run - - def _checkpoint(self, func, *args, **kwargs): - return LegacyRecomputeFunction.apply(func, self._preserve_rng_state, - *args) - - def forward(self, input): - end = 0 - for begin in range(0, self._segment_size * (self._segments - 1), - self._segment_size): - end = begin + self._segment_size - input = self._checkpoint(self._run_func(begin, end), input) - return self._run_func(end, len(self._layers))(input) - def apply_ir_passes(main_program, startup_program, config): build_strategy = config._user_defined_strategy.build_strategy._copy() @@ -207,6 +163,7 @@ class Fleet(object): self._runtime_handle = None self._util = None self._context = {} + self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0) def init(self, role_maker=None, is_collective=False, strategy=None): """ @@ -377,6 +334,7 @@ class Fleet(object): ] cg.set_comm_group('model', mp_rank, mp_degree, mp_ring_id, mp_group_ranks) + return self def _init_hybrid_parallel_env(self): """initialize the hybrid environment @@ -1067,415 +1025,8 @@ class Fleet(object): self._context = {} - if paddle.fluid.framework._non_static_mode(): - if self.worker_num() > 1: - if self._user_defined_strategy.heter_ccl_mode == False: - return HybridParallelOptimizer(optimizer, self._hcg, - self._user_defined_strategy) - else: - return HeterParallelOptimizer(optimizer, - self._user_defined_strategy) - else: - return optimizer return self - @dygraph_only - def distributed_model(self, model): - """ - Return distributed data parallel model (Only work in dygraph mode) - - Args: - model (Layer): the user-defind model which inherits Layer. - - Returns: - distributed data parallel model which inherits Layer. - - Examples: - - .. code-block:: python - - import paddle - import paddle.nn as nn - from paddle.distributed import fleet - - class LinearNet(nn.Layer): - def __init__(self): - super(LinearNet, self).__init__() - self._linear1 = nn.Linear(10, 10) - self._linear2 = nn.Linear(10, 1) - - def forward(self, x): - return self._linear2(self._linear1(x)) - - # 1. initialize fleet environment - fleet.init(is_collective=True) - - # 2. create layer & optimizer - layer = LinearNet() - loss_fn = nn.MSELoss() - adam = paddle.optimizer.Adam( - learning_rate=0.001, parameters=layer.parameters()) - - # 3. get data_parallel model using fleet - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - - # 4. run layer - inputs = paddle.randn([10, 10], 'float32') - outputs = dp_layer(inputs) - labels = paddle.randn([10, 1], 'float32') - loss = loss_fn(outputs, labels) - - print("loss:", loss.numpy()) - - loss.backward() - - adam.step() - adam.clear_grad() - - - """ - assert model is not None, "model should not be None" - if self.worker_num() <= 1: - return model - - amp_enable = False - recompute_enable = False - strategy = self._user_defined_strategy - if strategy.amp == True: - amp_enable = True - amp_level = "O2" if strategy.amp_configs['use_pure_fp16'] else "O1" - if amp_level.upper() == "O2": - model = paddle.amp.decorate(models=model, - optimizers=None, - level="O2", - master_weight=None, - save_dtype=None) - init_loss_scaling = strategy.amp_configs['init_loss_scaling'] - incr_ratio = strategy.amp_configs['incr_ratio'] - decr_ratio = strategy.amp_configs['decr_ratio'] - incr_every_n_steps = strategy.amp_configs['incr_every_n_steps'] - decr_every_n_nan_or_inf = strategy.amp_configs[ - 'decr_every_n_nan_or_inf'] - use_dynamic_loss_scaling = strategy.amp_configs[ - 'use_dynamic_loss_scaling'] - - global _grad_scalar - _grad_scalar = paddle.amp.GradScaler( - init_loss_scaling=init_loss_scaling, - incr_ratio=incr_ratio, - decr_ratio=decr_ratio, - incr_every_n_steps=incr_every_n_steps, - decr_every_n_nan_or_inf=decr_every_n_nan_or_inf, - use_dynamic_loss_scaling=use_dynamic_loss_scaling) - - if strategy.recompute == True: - recompute_enable = True - model = _RecomputeModelWrapper(model) - - if self._user_defined_strategy.heter_ccl_mode == True: - distributed_model = paddle.DataParallel( - model, - comm_buffer_size=self._user_defined_strategy. - fuse_grad_size_in_MB, - last_comm_buffer_size=self._user_defined_strategy. - last_comm_group_size_MB, - find_unused_parameters=self._user_defined_strategy. - find_unused_parameters) - return distributed_model - - if self._hcg.get_parallel_mode() == ParallelMode.SHARDING_PARALLEL: - model = ShardingParallel(model, - self._hcg, - strategy=self._user_defined_strategy) - elif self._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL: - - # NOTE (JZ-LIANG) init parameters broadcast within sharding group - # normally it should be done inside DataParallel - if self.sharding_degree > 1: - from paddle.distributed.fleet.utils.hybrid_parallel_util import broadcast_mp_parameters, broadcast_sharding_parameters - assert self.sharding_degree == self._hcg.get_sharding_parallel_world_size( - ) - broadcast_sharding_parameters(model, self._hcg) - model = paddle.DataParallel( - model, - comm_buffer_size=self._user_defined_strategy. - fuse_grad_size_in_MB, - last_comm_buffer_size=self._user_defined_strategy. - last_comm_group_size_MB, - find_unused_parameters=self._user_defined_strategy. - find_unused_parameters) - elif self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL: - model = TensorParallel(model, - self._hcg, - strategy=self._user_defined_strategy) - elif self._hcg.get_parallel_mode() == ParallelMode.PIPELINE_PARALLEL: - model = PipelineParallel(model, - self._hcg, - strategy=self._user_defined_strategy) - - return model - - @dygraph_only - def state_dict(self): - """ - Get state dict information from optimizer. - (Only work in dygraph mode) - - Returns: - state_dict(dict) : dict contains all the Tensor used by optimizer - - Examples: - .. code-block:: python - - import numpy as np - import paddle - from paddle.distributed import fleet - - fleet.init(is_collective=True) - - value = np.arange(26).reshape(2, 13).astype("float32") - a = paddle.to_tensor(value) - - layer = paddle.nn.Linear(13, 5) - adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters()) - - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - state_dict = adam.state_dict() - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.state_dict() - - @dygraph_only - def set_state_dict(self, state_dict): - """ - Load optimizer state dict. - (Only work in dygraph mode) - - Args: - state_dict(dict) : Dict contains all the Tensor needed by optimizer - - Returns: - None - - Examples: - .. code-block:: python - - import numpy as np - import paddle - from paddle.distributed import fleet - - fleet.init(is_collective=True) - - value = np.arange(26).reshape(2, 13).astype("float32") - a = paddle.to_tensor(value) - - layer = paddle.nn.Linear(13, 5) - adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters()) - - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - state_dict = adam.state_dict() - paddle.save(state_dict, "paddle_dy") - para_state_dict = paddle.load("paddle_dy") - adam.set_state_dict(para_state_dict) - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.set_state_dict(state_dict) - - @dygraph_only - def set_lr(self, value): - """ - Set the value of the learning rate manually in the optimizer. - (Only work in dygraph mode) - - Args: - value (float|Tensor): the value of learning rate - - Returns: - None - - Examples: - .. code-block:: python - - import numpy as np - import paddle - from paddle.distributed import fleet - - fleet.init(is_collective=True) - - value = np.arange(26).reshape(2, 13).astype("float32") - a = paddle.to_tensor(value) - - layer = paddle.nn.Linear(13, 5) - adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters()) - - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - - lr_list = [0.2, 0.3, 0.4, 0.5, 0.6] - for i in range(5): - adam.set_lr(lr_list[i]) - lr = adam.get_lr() - print("current lr is {}".format(lr)) - # Print: - # current lr is 0.2 - # current lr is 0.3 - # current lr is 0.4 - # current lr is 0.5 - # current lr is 0.6 - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.set_lr(value) - - @dygraph_only - def get_lr(self): - """ - Get current step learning rate. - (Only work in dygraph mode) - - Returns: - float: The learning rate of the current step. - - Examples: - - .. code-block:: python - - import numpy as np - import paddle - from paddle.distributed import fleet - - fleet.init(is_collective=True) - - value = np.arange(26).reshape(2, 13).astype("float32") - a = paddle.to_tensor(value) - - layer = paddle.nn.Linear(13, 5) - adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters()) - - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - - lr = adam.get_lr() - print(lr) # 0.01 - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.get_lr() - - @dygraph_only - def step(self): - """ - Execute the optimizer once. - (Only work in dygraph mode) - - Returns: - None - - Examples: - - .. code-block:: python - - import paddle - import paddle.nn as nn - from paddle.distributed import fleet - - class LinearNet(nn.Layer): - def __init__(self): - super(LinearNet, self).__init__() - self._linear1 = nn.Linear(10, 10) - self._linear2 = nn.Linear(10, 1) - - def forward(self, x): - return self._linear2(self._linear1(x)) - - # 1. initialize fleet environment - fleet.init(is_collective=True) - - # 2. create layer & optimizer - layer = LinearNet() - loss_fn = nn.MSELoss() - adam = paddle.optimizer.Adam( - learning_rate=0.001, parameters=layer.parameters()) - - # 3. get data_parallel model using fleet - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - - # 4. run layer - inputs = paddle.randn([10, 10], 'float32') - outputs = dp_layer(inputs) - labels = paddle.randn([10, 1], 'float32') - loss = loss_fn(outputs, labels) - - print("loss:", loss.numpy()) - - loss.backward() - - adam.step() - adam.clear_grad() - - - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.step() - - @dygraph_only - def clear_grad(self): - """ - Clear the gradients of all optimized parameters for model. - (Only work in dygraph mode) - - Returns: - None - - Examples: - - .. code-block:: python - - import paddle - import paddle.nn as nn - from paddle.distributed import fleet - - class LinearNet(nn.Layer): - def __init__(self): - super(LinearNet, self).__init__() - self._linear1 = nn.Linear(10, 10) - self._linear2 = nn.Linear(10, 1) - - def forward(self, x): - return self._linear2(self._linear1(x)) - - # 1. initialize fleet environment - fleet.init(is_collective=True) - - # 2. create layer & optimizer - layer = LinearNet() - loss_fn = nn.MSELoss() - adam = paddle.optimizer.Adam( - learning_rate=0.001, parameters=layer.parameters()) - - # 3. get data_parallel model using fleet - adam = fleet.distributed_optimizer(adam) - dp_layer = fleet.distributed_model(layer) - - # 4. run layer - inputs = paddle.randn([10, 10], 'float32') - outputs = dp_layer(inputs) - labels = paddle.randn([10, 1], 'float32') - loss = loss_fn(outputs, labels) - - print("loss:", loss.numpy()) - - loss.backward() - - adam.step() - adam.clear_grad() - - """ - # imitate target optimizer retrieval - return self.user_defined_optimizer.clear_grad() - def _get_amp_optimizer(self): # imitate target optimizer retrieval amp_optimizer = None @@ -1702,7 +1253,7 @@ class Fleet(object): # Use the auto-parallel's routines instead if self._user_defined_strategy.semi_auto or self._user_defined_strategy.auto_search: - from ...auto_parallel.parallelizer import AutoParallelizer + from ..auto_parallel.parallelizer import AutoParallelizer auto_parallelizer = AutoParallelizer(self) optimize_ops, params_grads, dist_startup_prog, dist_main_prog = auto_parallelizer.parallelize( loss, startup_program, parameter_list, no_grad_set) @@ -1877,7 +1428,7 @@ class Fleet(object): optimize_ops = [] params_grads = [] - from ..meta_optimizers import ParameterServerOptimizer + from .meta_optimizers import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(self.user_defined_optimizer) ps_optimizer._set_basic_info(losses, self._role_maker, self.user_defined_optimizer, @@ -1912,66 +1463,3 @@ class Fleet(object): fleet.util._set_strategy(context["valid_strategy"]) return optimize_ops, params_grads - - @dygraph_only - def distributed_scaler(self, scaler): - - def unscale_method(self, optimizer): - if not self._enable: - return - if getattr(optimizer, '_param_groups', None) and isinstance( - optimizer._param_groups[0], dict): - param_grads = [] - param_grads_fp16 = [] - param_grads_fp32 = [] - for group in optimizer._param_groups: - for param in group['params']: - if param._grad_ivar() is not None: - param_grads.append(param._grad_ivar()) - if param._grad_ivar( - ).dtype == core.VarDesc.VarType.FP16: - param_grads_fp16.append(param._grad_ivar()) - else: - param_grads_fp32.append(param._grad_ivar()) - else: - param_grads = [ - param._grad_ivar() for param in optimizer._parameter_list - if param._grad_ivar() is not None - ] - param_grads_fp16 = [ - param._grad_ivar() for param in optimizer._parameter_list - if (param._grad_ivar() is not None) and ( - param._grad_ivar().dtype == core.VarDesc.VarType.FP16) - ] - param_grads_fp32 = [ - param._grad_ivar() for param in optimizer._parameter_list - if (param._grad_ivar() is not None) and ( - param._grad_ivar().dtype == core.VarDesc.VarType.FP32) - ] - temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_)) - temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_)) - if len(param_grads_fp16): - _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale, - param_grads_fp16, - temp_found_inf_fp16) - if len(param_grads_fp32): - _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale, - param_grads_fp32, - temp_found_inf_fp32) - - self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0 - is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32") - - # TODO(shenliang03) Since dp allreduce in the optimizer is - # after the gradscaler, check_finite needs to synchronize global - # information. In the future, we should use check_group to speed. - paddle.distributed.all_reduce(is_found_inf, - op=paddle.distributed.ReduceOp.MAX, - group=None) - self._found_inf = is_found_inf.numpy()[0] - - # Only data_parallel doesn't need to modify scaler - if self._hcg.get_parallel_mode() is not ParallelMode.DATA_PARALLEL: - scaler._unscale = MethodType(unscale_method, scaler) - - return scaler diff --git a/python/paddle/distributed/fleet/model.py b/python/paddle/distributed/fleet/model.py new file mode 100644 index 0000000000000000000000000000000000000000..988d2d928cc2b6b9dc53bb9c16ec7c8793961f0d --- /dev/null +++ b/python/paddle/distributed/fleet/model.py @@ -0,0 +1,190 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import os +import numpy as np +from .base import topology as tp +from .base.topology import ParallelMode +from .meta_parallel import TensorParallel, model_parallel_random_seed +from .meta_parallel import PipelineParallel, ShardingParallel +from paddle.fluid import core +from paddle.distributed.fleet.utils.recompute import LegacyRecomputeFunction +from paddle.fluid.dygraph.varbase_patch_methods import _grad_scalar +from paddle.distributed import fleet + + +class _RecomputeModelWrapper(paddle.nn.Layer): + + def __init__(self, model, segments=2, preserve_rng_state=True): + super(_RecomputeModelWrapper, self).__init__() + assert isinstance(model, paddle.nn.Sequential), ( + "The model passed to RecomputeModelWrapper must be of type " + "paddle.nn.Sequential.") + self._model = model + self._segments = segments + self._preserve_rng_state = preserve_rng_state + self._layers = list(model.children()) + self._segment_size = len(self._layers) // segments + + def _run_func(self, begin, end): + + def do_run(input): + for i in range(begin, end): + input = self._layers[i](input) + return input + + return do_run + + def _checkpoint(self, func, *args, **kwargs): + return LegacyRecomputeFunction.apply(func, self._preserve_rng_state, + *args) + + def forward(self, input): + end = 0 + for begin in range(0, self._segment_size * (self._segments - 1), + self._segment_size): + end = begin + self._segment_size + input = self._checkpoint(self._run_func(begin, end), input) + return self._run_func(end, len(self._layers))(input) + + +_grad_scalar = None + + +def distributed_model(model): + """ + Return distributed data parallel model (Only work in dygraph mode) + + Args: + model (Layer): the user-defind model which inherits Layer. + + Returns: + distributed data parallel model which inherits Layer. + + Examples: + + .. code-block:: python + + import paddle + import paddle.nn as nn + from paddle.distributed import fleet + + class LinearNet(nn.Layer): + def __init__(self): + super(LinearNet, self).__init__() + self._linear1 = nn.Linear(10, 10) + self._linear2 = nn.Linear(10, 1) + + def forward(self, x): + return self._linear2(self._linear1(x)) + + # 1. initialize fleet environment + fleet.init(is_collective=True) + + # 2. create layer & optimizer + layer = LinearNet() + loss_fn = nn.MSELoss() + adam = paddle.optimizer.Adam( + learning_rate=0.001, parameters=layer.parameters()) + + # 3. get data_parallel model using fleet + adam = fleet.distributed_optimizer(adam) + dp_layer = fleet.distributed_model(layer) + + # 4. run layer + inputs = paddle.randn([10, 10], 'float32') + outputs = dp_layer(inputs) + labels = paddle.randn([10, 1], 'float32') + loss = loss_fn(outputs, labels) + + print("loss:", loss.numpy()) + + loss.backward() + + adam.step() + adam.clear_grad() + + + """ + fleet_env = fleet.fleet + + assert model is not None, "model should not be None" + if fleet_env.worker_num() <= 1: + return model + + amp_enable = False + recompute_enable = False + strategy = fleet_env._user_defined_strategy + if strategy.amp == True: + amp_enable = True + amp_level = "O2" if strategy.amp_configs['use_pure_fp16'] else "O1" + if amp_level.upper() == "O2": + model = paddle.amp.decorate(models=model, + optimizers=None, + level="O2", + master_weight=None, + save_dtype=None) + init_loss_scaling = strategy.amp_configs['init_loss_scaling'] + incr_ratio = strategy.amp_configs['incr_ratio'] + decr_ratio = strategy.amp_configs['decr_ratio'] + incr_every_n_steps = strategy.amp_configs['incr_every_n_steps'] + decr_every_n_nan_or_inf = strategy.amp_configs[ + 'decr_every_n_nan_or_inf'] + use_dynamic_loss_scaling = strategy.amp_configs[ + 'use_dynamic_loss_scaling'] + + global _grad_scalar + _grad_scalar = paddle.amp.GradScaler( + init_loss_scaling=init_loss_scaling, + incr_ratio=incr_ratio, + decr_ratio=decr_ratio, + incr_every_n_steps=incr_every_n_steps, + decr_every_n_nan_or_inf=decr_every_n_nan_or_inf, + use_dynamic_loss_scaling=use_dynamic_loss_scaling) + + if strategy.recompute == True: + recompute_enable = True + model = _RecomputeModelWrapper(model) + + if strategy.heter_ccl_mode == True: + distributed_model = paddle.DataParallel( + model, + comm_buffer_size=strategy.fuse_grad_size_in_MB, + last_comm_buffer_size=strategy.last_comm_group_size_MB, + find_unused_parameters=strategy.find_unused_parameters) + return distributed_model + + if fleet_env._hcg.get_parallel_mode() == ParallelMode.SHARDING_PARALLEL: + model = ShardingParallel(model, fleet_env._hcg, strategy=strategy) + elif fleet_env._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL: + + # NOTE (JZ-LIANG) init parameters broadcast within sharding group + # normally it should be done inside DataParallel + if fleet_env.sharding_degree > 1: + from paddle.distributed.fleet.utils.hybrid_parallel_util import broadcast_mp_parameters, broadcast_sharding_parameters + assert fleet_env.sharding_degree == fleet_env._hcg.get_sharding_parallel_world_size( + ) + broadcast_sharding_parameters(model, fleet_env._hcg) + model = paddle.DataParallel( + model, + comm_buffer_size=strategy.fuse_grad_size_in_MB, + last_comm_buffer_size=strategy.last_comm_group_size_MB, + find_unused_parameters=strategy.find_unused_parameters) + elif fleet_env._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL: + model = TensorParallel(model, fleet_env._hcg, strategy=strategy) + elif fleet_env._hcg.get_parallel_mode() == ParallelMode.PIPELINE_PARALLEL: + model = PipelineParallel(model, fleet_env._hcg, strategy=strategy) + + return model diff --git a/python/paddle/distributed/fleet/optimizer.py b/python/paddle/distributed/fleet/optimizer.py new file mode 100644 index 0000000000000000000000000000000000000000..bfc3d737f9934115d99edda9a3a8f25f347d2a14 --- /dev/null +++ b/python/paddle/distributed/fleet/optimizer.py @@ -0,0 +1,80 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import warnings +import paddle +import os +import numpy as np +from paddle.fluid.framework import dygraph_only, _global_flags +from .base.distributed_strategy import DistributedStrategy +from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer +from paddle.fluid import core +from paddle.distributed import fleet + + +def _dygraph_distributed_optimizer(optimizer, strategy=None): + """ + Optimizer for distributed training. + For the distributed training, this method would rebuild a new instance of DistributedOptimizer. + Which has basic Optimizer function and special features for distributed training. + Args: + optimizer(Optimizer): The executor to run for init server. + strategy(DistributedStrategy): Extra properties for distributed optimizer. + It is recommended to use DistributedStrategy in fleet.init(). The strategy + here is for compatibility. If the strategy in fleet.distributed_optimizer() + is not None, then it will overwrite the DistributedStrategy in fleet.init(), + which will take effect in distributed training. + Returns: + Fleet: instance of fleet. + Examples: + .. code-block:: python + import paddle + import paddle.distributed.fleet as fleet + fleet.init(is_collective=True) + strategy = fleet.DistributedStrategy() + optimizer = paddle.optimizer.SGD(learning_rate=0.001) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + """ + fleet_env = fleet.fleet + fleet_env.user_defined_optimizer = optimizer + + if strategy is not None: + if fleet_env._is_collective: + warnings.warn( + "It is recommended to use DistributedStrategy " + "in fleet_env.init(). The strategy here is only for compatibility. " + "If the strategy in fleet_env.distributed_optimizer() is " + "not None, then it will overwrite the DistributedStrategy in fleet_env.init(), " + "which will take effect in distributed training.") + fleet_env._user_defined_strategy = copy.deepcopy(strategy) + + fleet_env._context = {} + + if fleet_env.worker_num() > 1: + if fleet_env._user_defined_strategy.heter_ccl_mode == False: + return HybridParallelOptimizer(optimizer, fleet_env._hcg, + fleet_env._user_defined_strategy) + else: + return HeterParallelOptimizer(optimizer, + fleet_env._user_defined_strategy) + else: + return optimizer + + +def distributed_optimizer(*args, **kwargs): + if paddle.fluid.framework._non_static_mode(): + return _dygraph_distributed_optimizer(*args, **kwargs) + else: + return fleet.fleet.distributed_optimizer(*args, **kwargs) diff --git a/python/paddle/distributed/fleet/scaler.py b/python/paddle/distributed/fleet/scaler.py new file mode 100644 index 0000000000000000000000000000000000000000..f9a35e246848f307ef99e3a29e98dded4c63e585 --- /dev/null +++ b/python/paddle/distributed/fleet/scaler.py @@ -0,0 +1,87 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +from paddle.fluid.framework import dygraph_only +from .base.topology import ParallelMode +from paddle.distributed import fleet +from types import MethodType +from paddle.fluid import core +from paddle.fluid.dygraph import to_variable +import numpy as np +from paddle import _C_ops + + +def distributed_scaler(scaler): + + def unscale_method(self, optimizer): + if not self._enable: + return + if getattr(optimizer, '_param_groups', None) and isinstance( + optimizer._param_groups[0], dict): + param_grads = [] + param_grads_fp16 = [] + param_grads_fp32 = [] + for group in optimizer._param_groups: + for param in group['params']: + if param._grad_ivar() is not None: + param_grads.append(param._grad_ivar()) + if param._grad_ivar( + ).dtype == core.VarDesc.VarType.FP16: + param_grads_fp16.append(param._grad_ivar()) + else: + param_grads_fp32.append(param._grad_ivar()) + else: + param_grads = [ + param._grad_ivar() for param in optimizer._parameter_list + if param._grad_ivar() is not None + ] + param_grads_fp16 = [ + param._grad_ivar() for param in optimizer._parameter_list + if (param._grad_ivar() is not None) and ( + param._grad_ivar().dtype == core.VarDesc.VarType.FP16) + ] + param_grads_fp32 = [ + param._grad_ivar() for param in optimizer._parameter_list + if (param._grad_ivar() is not None) and ( + param._grad_ivar().dtype == core.VarDesc.VarType.FP32) + ] + temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_)) + temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_)) + if len(param_grads_fp16): + _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale, + param_grads_fp16, + temp_found_inf_fp16) + if len(param_grads_fp32): + _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale, + param_grads_fp32, + temp_found_inf_fp32) + + self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0 + is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32") + + # TODO(shenliang03) Since dp allreduce in the optimizer is + # after the gradscaler, check_finite needs to synchronize global + # information. In the future, we should use check_group to speed. + paddle.distributed.all_reduce(is_found_inf, + op=paddle.distributed.ReduceOp.MAX, + group=None) + self._found_inf = is_found_inf.numpy()[0] + + # Only data_parallel doesn't need to modify scaler + fleet_env = fleet.fleet + if fleet_env._hcg.get_parallel_mode() is not ParallelMode.DATA_PARALLEL: + scaler._unscale = MethodType(unscale_method, scaler) + + return scaler