未验证 提交 8636d2a2 编写于 作者: W wuhuachaocoding 提交者: GitHub

refactor fleet. (#44833)

* refactor fleet.

* refact fleet.py.

* update fleet/__init__.py.

* update fleet.py

* update code style.

* update fleet

* update fleet

* update fleet

* update fleet

* update model.py

* update fleet.

* update __init__.py

* update fleet.

* update fleet.

* update fleet

* update fleet

* update fleet

* update fleet.

* update optimizer.py

* update optimizer

* update fleet.py

* update scaler.py

* update setup.py.in
上级 f30c7bd6
......@@ -17,7 +17,6 @@ from .base.role_maker import Role # noqa: F401
from .base.role_maker import UserDefinedRoleMaker # noqa: F401
from .base.role_maker import PaddleCloudRoleMaker # noqa: F401
from .base.distributed_strategy import DistributedStrategy # noqa: F401
from .base.fleet_base import Fleet # noqa: F401
from .base.util_factory import UtilBase # noqa: F401
from .dataset import DatasetBase # noqa: F401
from .dataset import InMemoryDataset # noqa: F401
......@@ -29,6 +28,10 @@ from .data_generator.data_generator import MultiSlotStringDataGenerator # noqa:
from . import metrics # noqa: F401
from .base.topology import CommunicateTopology
from .base.topology import HybridCommunicateGroup # noqa: F401
from .fleet import Fleet
from .model import distributed_model
from .optimizer import distributed_optimizer
from .scaler import distributed_scaler
__all__ = [ #noqa
"CommunicateTopology", "UtilBase", "HybridCommunicateGroup",
......@@ -72,7 +75,7 @@ init_worker = fleet.init_worker
init_server = fleet.init_server
run_server = fleet.run_server
stop_worker = fleet.stop_worker
distributed_optimizer = fleet.distributed_optimizer
distributed_optimizer = distributed_optimizer
save_inference_model = fleet.save_inference_model
save_persistables = fleet.save_persistables
save_cache_model = fleet.save_cache_model
......@@ -83,13 +86,7 @@ load_model = fleet.load_model
load_inference_model = fleet.load_inference_model
load_one_table = fleet.load_one_table
minimize = fleet.minimize
distributed_model = fleet.distributed_model
step = fleet.step
clear_grad = fleet.clear_grad
set_lr = fleet.set_lr
get_lr = fleet.get_lr
state_dict = fleet.state_dict
set_state_dict = fleet.set_state_dict
distributed_model = distributed_model
shrink = fleet.shrink
get_hybrid_communicate_group = fleet.get_hybrid_communicate_group
distributed_scaler = fleet.distributed_scaler
distributed_scaler = distributed_scaler
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import os
import numpy as np
from .base import topology as tp
from .base.topology import ParallelMode
from .meta_parallel import TensorParallel, model_parallel_random_seed
from .meta_parallel import PipelineParallel, ShardingParallel
from paddle.fluid import core
from paddle.distributed.fleet.utils.recompute import LegacyRecomputeFunction
from paddle.fluid.dygraph.varbase_patch_methods import _grad_scalar
from paddle.distributed import fleet
class _RecomputeModelWrapper(paddle.nn.Layer):
def __init__(self, model, segments=2, preserve_rng_state=True):
super(_RecomputeModelWrapper, self).__init__()
assert isinstance(model, paddle.nn.Sequential), (
"The model passed to RecomputeModelWrapper must be of type "
"paddle.nn.Sequential.")
self._model = model
self._segments = segments
self._preserve_rng_state = preserve_rng_state
self._layers = list(model.children())
self._segment_size = len(self._layers) // segments
def _run_func(self, begin, end):
def do_run(input):
for i in range(begin, end):
input = self._layers[i](input)
return input
return do_run
def _checkpoint(self, func, *args, **kwargs):
return LegacyRecomputeFunction.apply(func, self._preserve_rng_state,
*args)
def forward(self, input):
end = 0
for begin in range(0, self._segment_size * (self._segments - 1),
self._segment_size):
end = begin + self._segment_size
input = self._checkpoint(self._run_func(begin, end), input)
return self._run_func(end, len(self._layers))(input)
_grad_scalar = None
def distributed_model(model):
"""
Return distributed data parallel model (Only work in dygraph mode)
Args:
model (Layer): the user-defind model which inherits Layer.
Returns:
distributed data parallel model which inherits Layer.
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
# 1. initialize fleet environment
fleet.init(is_collective=True)
# 2. create layer & optimizer
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
# 3. get data_parallel model using fleet
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("loss:", loss.numpy())
loss.backward()
adam.step()
adam.clear_grad()
"""
fleet_env = fleet.fleet
assert model is not None, "model should not be None"
if fleet_env.worker_num() <= 1:
return model
amp_enable = False
recompute_enable = False
strategy = fleet_env._user_defined_strategy
if strategy.amp == True:
amp_enable = True
amp_level = "O2" if strategy.amp_configs['use_pure_fp16'] else "O1"
if amp_level.upper() == "O2":
model = paddle.amp.decorate(models=model,
optimizers=None,
level="O2",
master_weight=None,
save_dtype=None)
init_loss_scaling = strategy.amp_configs['init_loss_scaling']
incr_ratio = strategy.amp_configs['incr_ratio']
decr_ratio = strategy.amp_configs['decr_ratio']
incr_every_n_steps = strategy.amp_configs['incr_every_n_steps']
decr_every_n_nan_or_inf = strategy.amp_configs[
'decr_every_n_nan_or_inf']
use_dynamic_loss_scaling = strategy.amp_configs[
'use_dynamic_loss_scaling']
global _grad_scalar
_grad_scalar = paddle.amp.GradScaler(
init_loss_scaling=init_loss_scaling,
incr_ratio=incr_ratio,
decr_ratio=decr_ratio,
incr_every_n_steps=incr_every_n_steps,
decr_every_n_nan_or_inf=decr_every_n_nan_or_inf,
use_dynamic_loss_scaling=use_dynamic_loss_scaling)
if strategy.recompute == True:
recompute_enable = True
model = _RecomputeModelWrapper(model)
if strategy.heter_ccl_mode == True:
distributed_model = paddle.DataParallel(
model,
comm_buffer_size=strategy.fuse_grad_size_in_MB,
last_comm_buffer_size=strategy.last_comm_group_size_MB,
find_unused_parameters=strategy.find_unused_parameters)
return distributed_model
if fleet_env._hcg.get_parallel_mode() == ParallelMode.SHARDING_PARALLEL:
model = ShardingParallel(model, fleet_env._hcg, strategy=strategy)
elif fleet_env._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL:
# NOTE (JZ-LIANG) init parameters broadcast within sharding group
# normally it should be done inside DataParallel
if fleet_env.sharding_degree > 1:
from paddle.distributed.fleet.utils.hybrid_parallel_util import broadcast_mp_parameters, broadcast_sharding_parameters
assert fleet_env.sharding_degree == fleet_env._hcg.get_sharding_parallel_world_size(
)
broadcast_sharding_parameters(model, fleet_env._hcg)
model = paddle.DataParallel(
model,
comm_buffer_size=strategy.fuse_grad_size_in_MB,
last_comm_buffer_size=strategy.last_comm_group_size_MB,
find_unused_parameters=strategy.find_unused_parameters)
elif fleet_env._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL:
model = TensorParallel(model, fleet_env._hcg, strategy=strategy)
elif fleet_env._hcg.get_parallel_mode() == ParallelMode.PIPELINE_PARALLEL:
model = PipelineParallel(model, fleet_env._hcg, strategy=strategy)
return model
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import warnings
import paddle
import os
import numpy as np
from paddle.fluid.framework import dygraph_only, _global_flags
from .base.distributed_strategy import DistributedStrategy
from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer
from paddle.fluid import core
from paddle.distributed import fleet
def _dygraph_distributed_optimizer(optimizer, strategy=None):
"""
Optimizer for distributed training.
For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
Which has basic Optimizer function and special features for distributed training.
Args:
optimizer(Optimizer): The executor to run for init server.
strategy(DistributedStrategy): Extra properties for distributed optimizer.
It is recommended to use DistributedStrategy in fleet.init(). The strategy
here is for compatibility. If the strategy in fleet.distributed_optimizer()
is not None, then it will overwrite the DistributedStrategy in fleet.init(),
which will take effect in distributed training.
Returns:
Fleet: instance of fleet.
Examples:
.. code-block:: python
import paddle
import paddle.distributed.fleet as fleet
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
"""
fleet_env = fleet.fleet
fleet_env.user_defined_optimizer = optimizer
if strategy is not None:
if fleet_env._is_collective:
warnings.warn(
"It is recommended to use DistributedStrategy "
"in fleet_env.init(). The strategy here is only for compatibility. "
"If the strategy in fleet_env.distributed_optimizer() is "
"not None, then it will overwrite the DistributedStrategy in fleet_env.init(), "
"which will take effect in distributed training.")
fleet_env._user_defined_strategy = copy.deepcopy(strategy)
fleet_env._context = {}
if fleet_env.worker_num() > 1:
if fleet_env._user_defined_strategy.heter_ccl_mode == False:
return HybridParallelOptimizer(optimizer, fleet_env._hcg,
fleet_env._user_defined_strategy)
else:
return HeterParallelOptimizer(optimizer,
fleet_env._user_defined_strategy)
else:
return optimizer
def distributed_optimizer(*args, **kwargs):
if paddle.fluid.framework._non_static_mode():
return _dygraph_distributed_optimizer(*args, **kwargs)
else:
return fleet.fleet.distributed_optimizer(*args, **kwargs)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
from paddle.fluid.framework import dygraph_only
from .base.topology import ParallelMode
from paddle.distributed import fleet
from types import MethodType
from paddle.fluid import core
from paddle.fluid.dygraph import to_variable
import numpy as np
from paddle import _C_ops
def distributed_scaler(scaler):
def unscale_method(self, optimizer):
if not self._enable:
return
if getattr(optimizer, '_param_groups', None) and isinstance(
optimizer._param_groups[0], dict):
param_grads = []
param_grads_fp16 = []
param_grads_fp32 = []
for group in optimizer._param_groups:
for param in group['params']:
if param._grad_ivar() is not None:
param_grads.append(param._grad_ivar())
if param._grad_ivar(
).dtype == core.VarDesc.VarType.FP16:
param_grads_fp16.append(param._grad_ivar())
else:
param_grads_fp32.append(param._grad_ivar())
else:
param_grads = [
param._grad_ivar() for param in optimizer._parameter_list
if param._grad_ivar() is not None
]
param_grads_fp16 = [
param._grad_ivar() for param in optimizer._parameter_list
if (param._grad_ivar() is not None) and (
param._grad_ivar().dtype == core.VarDesc.VarType.FP16)
]
param_grads_fp32 = [
param._grad_ivar() for param in optimizer._parameter_list
if (param._grad_ivar() is not None) and (
param._grad_ivar().dtype == core.VarDesc.VarType.FP32)
]
temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_))
temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_))
if len(param_grads_fp16):
_C_ops.check_finite_and_unscale(param_grads_fp16, self._scale,
param_grads_fp16,
temp_found_inf_fp16)
if len(param_grads_fp32):
_C_ops.check_finite_and_unscale(param_grads_fp32, self._scale,
param_grads_fp32,
temp_found_inf_fp32)
self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0
is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32")
# TODO(shenliang03) Since dp allreduce in the optimizer is
# after the gradscaler, check_finite needs to synchronize global
# information. In the future, we should use check_group to speed.
paddle.distributed.all_reduce(is_found_inf,
op=paddle.distributed.ReduceOp.MAX,
group=None)
self._found_inf = is_found_inf.numpy()[0]
# Only data_parallel doesn't need to modify scaler
fleet_env = fleet.fleet
if fleet_env._hcg.get_parallel_mode() is not ParallelMode.DATA_PARALLEL:
scaler._unscale = MethodType(unscale_method, scaler)
return scaler
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册