提交 28dac4ec 编写于 作者: C Chen Weihang

remove data parallel scale loss & apply collective_grads

上级 df7fabee
...@@ -233,6 +233,121 @@ class ParallelEnv(object): ...@@ -233,6 +233,121 @@ class ParallelEnv(object):
Env = ParallelEnv Env = ParallelEnv
def _build_default_parallel_strategy():
strategy = ParallelStrategy()
strategy.nranks = ParallelEnv().nranks
strategy.local_rank = ParallelEnv().local_rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
strategy.current_endpoint = ParallelEnv().current_endpoint
return strategy
def _coalesce_tensors(var_groups):
from ..layers import nn
coalesced_grads_and_grad_vars = []
for group_id, grad_vars in var_groups.items():
flattened_vars = []
g_var_shapes = []
for g_var in grad_vars:
g_var_shapes.append(g_var.shape)
flattened_vars.append(
nn.reshape(
x=g_var, shape=[np.prod(g_var.shape)], inplace=True))
coalesced_grad = nn.concat(flattened_vars)
coalesced_grads_and_grad_vars.append(
[coalesced_grad, grad_vars, g_var_shapes])
return coalesced_grads_and_grad_vars
@framework.dygraph_only
def _reshape_inplace(x, shape):
x_shape = framework._varbase_creator(dtype=x.dtype)
framework._dygraph_tracer().trace_op(
type="reshape2",
inputs={'X': x},
outputs={'Out': x,
'XShape': x_shape},
attrs={'shape': shape})
@framework.dygraph_only
def _split_tensors(coalesced_grads_and_grad_vars):
for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars:
grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes]
framework._dygraph_tracer().trace_op(
type='split',
inputs={'X': coalesced_grad},
outputs={'Out': origin_grad_vars},
attrs={'sections': grad_var_len,
'axis': 0})
for g_var, g_shape in zip(origin_grad_vars, grad_shapes):
_reshape_inplace(x=g_var, shape=g_shape)
assert g_var.shape == g_shape
def scale_loss(loss):
if not ParallelEnv().world_size > 1:
return loss
loss_scale = to_variable(
np.array([ParallelEnv().world_size]).astype("float32"))
loss_scale.stop_gradient = True
scaled_loss = loss / loss_scale
return scaled_loss
@no_grad
def apply_collective_grads(parameters):
if not ParallelEnv().world_size > 1:
return
grad_var_set = set()
grad_vars = []
sparse_grad_vars = []
strategy = _build_default_parallel_strategy()
for param in parameters:
# NOTE(zcd): The grad_ivar maybe no generated.
if param.trainable and (param._grad_ivar() is not None):
g_var = param._grad_ivar()
if g_var._is_sparse():
sparse_grad_vars.append(g_var)
continue
grad_vars.append(g_var)
assert g_var not in grad_var_set
grad_var_set.add(g_var)
if sparse_grad_vars:
sparse_grad_vars.sort(key=lambda x: x.name)
for grad_var in sparse_grad_vars:
grad_var._allreduce(strategy)
# FIXME(zcd): the type of the var should be LoDTensor, i.e
# the gradients should be dense, otherwise, the following
# logic should be updated.
# 128 MB as a group
mega_bytes = 128 * 1024 * 1024
group_idx = 0
memory_counter = 0
grad_var_groups = OrderedDict()
dtype = grad_vars[0].dtype
for g_var in grad_vars:
# NOTE: the dtype of the same group should be the same.
bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype)
if memory_counter < mega_bytes and dtype == g_var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
group_idx += 1
grad_var_groups.setdefault(group_idx, []).append(g_var)
coalesced_grads_and_vars = _coalesce_tensors(grad_var_groups)
for coalesced_grad, _, _ in coalesced_grads_and_vars:
coalesced_grad._allreduce(strategy)
_split_tensors(coalesced_grads_and_vars)
class DataParallel(layers.Layer): class DataParallel(layers.Layer):
""" """
Run the dygraph module with data parallelism. Run the dygraph module with data parallelism.
...@@ -325,232 +440,26 @@ class DataParallel(layers.Layer): ...@@ -325,232 +440,26 @@ class DataParallel(layers.Layer):
if strategy is not None: if strategy is not None:
self._strategy = strategy self._strategy = strategy
else: else:
self._strategy = ParallelStrategy() self._strategy = _build_default_parallel_strategy()
self._strategy.nranks = ParallelEnv().nranks
self._strategy.local_rank = ParallelEnv().local_rank
self._strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
self._strategy.current_endpoint = ParallelEnv().current_endpoint
def forward(self, *inputs, **kwargs): def forward(self, *inputs, **kwargs):
return self._layers(*inputs, **kwargs) return self._layers(*inputs, **kwargs)
@deprecated(
since="2.0.0", reason="This method does not need to be called anymore.")
def scale_loss(self, loss): def scale_loss(self, loss):
""" """
Scale the loss. In data parallel mode, the loss should be scale with Deprecated method, now ``scale_loss`` is an empty method.
the number of trainers. If not in data parallel mode, return the loss
directly.
Args:
loss(Variable): The loss of the current Model.
Returns:
Variable: the scaled loss.
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize parallel environment
dist.init_parallel_env()
# 3. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
""" """
if not self._is_data_parallel_mode():
return loss
loss_scale = to_variable(
np.array([self._strategy.nranks]).astype("float32"))
loss_scale.stop_gradient = True
loss = loss / loss_scale
return loss return loss
def _coalesce_tensors(self, var_groups): @deprecated(
from ..layers import nn since="2.0.0", reason="This method does not need to be called anymore.")
coalesced_grads_and_grad_vars = []
for group_id, grad_vars in var_groups.items():
flattened_vars = []
g_var_shapes = []
for g_var in grad_vars:
g_var_shapes.append(g_var.shape)
flattened_vars.append(
nn.reshape(
x=g_var, shape=[np.prod(g_var.shape)], inplace=True))
coalesced_grad = nn.concat(flattened_vars)
coalesced_grads_and_grad_vars.append(
[coalesced_grad, grad_vars, g_var_shapes])
return coalesced_grads_and_grad_vars
def _reshape_inplace(self, x, shape):
x_shape = self._helper.create_variable_for_type_inference(dtype=x.dtype)
self._helper.append_op(
type="reshape2",
inputs={'X': x},
attrs={'shape': shape},
outputs={'Out': x,
'XShape': x_shape})
def _split_tensors(self, coalesced_grads_and_grad_vars):
from ..layers import nn
for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars:
grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes]
self._helper.main_program.current_block().append_op(
type='split',
inputs={'X': coalesced_grad},
outputs={'Out': origin_grad_vars},
attrs={'sections': grad_var_len,
'axis': 0})
for g_var, g_shape in zip(origin_grad_vars, grad_shapes):
self._reshape_inplace(x=g_var, shape=g_shape)
assert g_var.shape == g_shape
@no_grad
def apply_collective_grads(self): def apply_collective_grads(self):
""" """
AllReduce the Parameters' gradient. Deprecated method, now ``apply_collective_grads`` is an empty method.
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize parallel environment
dist.init_parallel_env()
# 3. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
""" """
if not self._is_data_parallel_mode(): return
return
grad_var_set = set()
grad_vars = []
sparse_grad_vars = []
for param in self._layers.parameters():
# NOTE(zcd): The grad_ivar maybe no generated.
if param.trainable and (param._grad_ivar() is not None):
g_var = param._grad_ivar()
if g_var._is_sparse():
sparse_grad_vars.append(g_var)
continue
grad_vars.append(g_var)
assert g_var not in grad_var_set
grad_var_set.add(g_var)
if sparse_grad_vars:
sparse_grad_vars.sort(key=lambda x: x.name)
for grad_var in sparse_grad_vars:
grad_var._allreduce(self._strategy)
# FIXME(zcd): the type of the var should be LoDTensor, i.e
# the gradients should be dense, otherwise, the following
# logic should be updated.
# 128 MB as a group
mega_bytes = 128 * 1024 * 1024
group_idx = 0
memory_counter = 0
grad_var_groups = OrderedDict()
dtype = grad_vars[0].dtype
for g_var in grad_vars:
# Note: the dtype of the same group should be the same.
bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype)
if memory_counter < mega_bytes and dtype == g_var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
group_idx += 1
grad_var_groups.setdefault(group_idx, []).append(g_var)
coalesced_grads_and_vars = self._coalesce_tensors(grad_var_groups)
for coalesced_grad, _, _ in coalesced_grads_and_vars:
coalesced_grad._allreduce(self._strategy)
self._split_tensors(coalesced_grads_and_vars)
def _is_data_parallel_mode(self):
return self._strategy.nranks > 1
def state_dict(self, def state_dict(self,
destination=None, destination=None,
......
...@@ -13,12 +13,15 @@ ...@@ -13,12 +13,15 @@
# limitations under the License. # limitations under the License.
import inspect import inspect
import numpy as np
import paddle
from .. import framework from .. import framework
from .. import core from .. import core
from ..framework import Variable, Parameter, ParamBase from ..framework import Variable, Parameter, ParamBase
from .base import switch_to_static_graph from .base import switch_to_static_graph, to_variable
import numpy as np
from .math_op_patch import monkey_patch_math_varbase from .math_op_patch import monkey_patch_math_varbase
from .parallel import scale_loss
def monkey_patch_varbase(): def monkey_patch_varbase():
...@@ -165,7 +168,12 @@ def monkey_patch_varbase(): ...@@ -165,7 +168,12 @@ def monkey_patch_varbase():
""" """
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
self._run_backward(framework._dygraph_tracer(), retain_graph) if paddle.distributed.get_world_size() > 1:
scaled_loss = scale_loss(self)
scaled_loss._run_backward(framework._dygraph_tracer(),
retain_graph)
else:
self._run_backward(framework._dygraph_tracer(), retain_graph)
else: else:
raise ValueError( raise ValueError(
"Variable.backward() is only available in DyGraph mode") "Variable.backward() is only available in DyGraph mode")
......
...@@ -19,8 +19,10 @@ import six ...@@ -19,8 +19,10 @@ import six
import logging import logging
from collections import defaultdict from collections import defaultdict
import paddle
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.framework import Program, Variable, name_scope, default_main_program, default_startup_program, device_guard from paddle.fluid.framework import Program, Variable, name_scope, default_main_program, default_startup_program, device_guard
from paddle.fluid.dygraph.parallel import scale_loss, apply_collective_grads
from . import framework from . import framework
from . import layers from . import layers
...@@ -40,7 +42,6 @@ from paddle.fluid.layers import tensor ...@@ -40,7 +42,6 @@ from paddle.fluid.layers import tensor
from functools import reduce from functools import reduce
from .wrapped_decorator import signature_safe_contextmanager from .wrapped_decorator import signature_safe_contextmanager
from .. import compat as cpt from .. import compat as cpt
import paddle
__all__ = [ __all__ = [
'SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'Dpsgd', 'DecayedAdagrad', 'SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'Dpsgd', 'DecayedAdagrad',
...@@ -939,6 +940,11 @@ class Optimizer(object): ...@@ -939,6 +940,11 @@ class Optimizer(object):
parameter_list = parameter_list if parameter_list \ parameter_list = parameter_list if parameter_list \
else self._parameter_list else self._parameter_list
if paddle.distributed.get_world_size() > 1:
loss = scale_loss(loss)
apply_collective_grads(parameter_list)
params_grads = self.backward( params_grads = self.backward(
loss, loss,
startup_program=startup_program, startup_program=startup_program,
......
...@@ -435,13 +435,7 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -435,13 +435,7 @@ class TestParallelDyGraphRunnerBase(object):
"loss at step %d: %f" % (step_id, loss.numpy())) "loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy()) out_losses.append(loss.numpy())
# FIXME(Yancey1989): scale the loss inplace
if args.update_method == "nccl2":
loss = model.scale_loss(loss)
loss.backward() loss.backward()
if args.update_method == "nccl2":
model.apply_collective_grads()
opt.minimize(loss) opt.minimize(loss)
model.clear_gradients() model.clear_gradients()
...@@ -477,12 +471,7 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -477,12 +471,7 @@ class TestParallelDyGraphRunnerBase(object):
loss = self.run_one_loop(model, opt, data) loss = self.run_one_loop(model, opt, data)
out_losses.append(loss.numpy()) out_losses.append(loss.numpy())
if args.update_method == "nccl2":
loss = model.scale_loss(loss)
loss.backward() loss.backward()
if args.update_method == "nccl2":
model.apply_collective_grads()
opt.minimize(loss) opt.minimize(loss)
model.clear_gradients() model.clear_gradients()
...@@ -521,12 +510,7 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -521,12 +510,7 @@ class TestParallelDyGraphRunnerBase(object):
loss = self.run_one_loop(model, opt, data) loss = self.run_one_loop(model, opt, data)
out_losses.append(loss.numpy()) out_losses.append(loss.numpy())
if args.update_method == "nccl2":
loss = model.scale_loss(loss)
loss.backward() loss.backward()
if args.update_method == "nccl2":
model.apply_collective_grads()
opt.step() opt.step()
opt.clear_grad() opt.clear_grad()
......
...@@ -17,6 +17,8 @@ from ..fluid import core ...@@ -17,6 +17,8 @@ from ..fluid import core
from ..fluid import framework from ..fluid import framework
from ..fluid.framework import Variable from ..fluid.framework import Variable
from paddle.fluid.dygraph.parallel import apply_collective_grads
__all__ = ["Adam"] __all__ = ["Adam"]
...@@ -276,7 +278,9 @@ class Adam(Optimizer): ...@@ -276,7 +278,9 @@ class Adam(Optimizer):
adam.step() adam.step()
adam.clear_grad() adam.clear_grad()
""" """
parameter_list = self._parameter_list if paddle.distributed.get_world_size() > 1:
apply_collective_grads(self._parameter_list)
self._dtype = None self._dtype = None
params_grads = [] params_grads = []
for param in self._parameter_list: for param in self._parameter_list:
......
...@@ -16,6 +16,8 @@ from .optimizer import Optimizer ...@@ -16,6 +16,8 @@ from .optimizer import Optimizer
from .adam import Adam from .adam import Adam
from ..fluid import framework from ..fluid import framework
import paddle import paddle
from paddle.fluid.dygraph.parallel import scale_loss, apply_collective_grads
__all__ = ['AdamW'] __all__ = ['AdamW']
...@@ -184,6 +186,13 @@ class AdamW(Adam): ...@@ -184,6 +186,13 @@ class AdamW(Adam):
startup_program=None, startup_program=None,
parameters=None, parameters=None,
no_grad_set=None): no_grad_set=None):
parameters = parameters if parameters \
else self._parameter_list
if paddle.distributed.get_world_size() > 1:
loss = scale_loss(loss)
apply_collective_grads(parameter_list)
params_grads = self.backward( params_grads = self.backward(
loss=loss, loss=loss,
startup_program=startup_program, startup_program=startup_program,
...@@ -206,7 +215,9 @@ class AdamW(Adam): ...@@ -206,7 +215,9 @@ class AdamW(Adam):
@framework.dygraph_only @framework.dygraph_only
def step(self): def step(self):
parameter_list = self._parameter_list if paddle.distributed.get_world_size() > 1:
apply_collective_grads(self._parameter_list)
self._dtype = None self._dtype = None
params_grads = [] params_grads = []
for param in self._parameter_list: for param in self._parameter_list:
...@@ -224,7 +235,7 @@ class AdamW(Adam): ...@@ -224,7 +235,7 @@ class AdamW(Adam):
updated_param = paddle.fluid.layers.elementwise_sub( updated_param = paddle.fluid.layers.elementwise_sub(
x=param, y=scaled_param) x=param, y=scaled_param)
param.set_value(updated_param.numpy()) param.set_value(updated_param.numpy())
optimize_ops = self._apply_optimize( self._apply_optimize(
loss=None, startup_program=None, params_grads=params_grads) loss=None, startup_program=None, params_grads=params_grads)
def __str__(self): def __str__(self):
......
...@@ -19,9 +19,10 @@ import six ...@@ -19,9 +19,10 @@ import six
import logging import logging
from collections import defaultdict from collections import defaultdict
import paddle
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.framework import Program, Variable, name_scope, default_main_program, default_startup_program, device_guard from paddle.fluid.framework import Program, Variable, name_scope, default_main_program, default_startup_program, device_guard
import paddle from paddle.fluid.dygraph.parallel import scale_loss, apply_collective_grads
from ..fluid import framework from ..fluid import framework
from ..fluid import layers from ..fluid import layers
...@@ -871,6 +872,11 @@ class Optimizer(object): ...@@ -871,6 +872,11 @@ class Optimizer(object):
parameter_list = parameters if parameters \ parameter_list = parameters if parameters \
else self._parameter_list else self._parameter_list
if paddle.distributed.get_world_size() > 1:
loss = scale_loss(loss)
apply_collective_grads(parameter_list)
params_grads = self.backward( params_grads = self.backward(
loss, loss,
startup_program=startup_program, startup_program=startup_program,
...@@ -907,7 +913,9 @@ class Optimizer(object): ...@@ -907,7 +913,9 @@ class Optimizer(object):
adam.step() adam.step()
adam.clear_grad() adam.clear_grad()
""" """
parameter_list = self._parameter_list if paddle.distributed.get_world_size() > 1:
apply_collective_grads(self._parameter_list)
self._dtype = None self._dtype = None
params_grads = [] params_grads = []
for param in self._parameter_list: for param in self._parameter_list:
...@@ -917,5 +925,5 @@ class Optimizer(object): ...@@ -917,5 +925,5 @@ class Optimizer(object):
grad_var = param._grad_ivar() grad_var = param._grad_ivar()
params_grads.append((param, grad_var)) params_grads.append((param, grad_var))
optimize_ops = self._apply_optimize( self._apply_optimize(
loss=None, startup_program=None, params_grads=params_grads) loss=None, startup_program=None, params_grads=params_grads)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册