diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index 75aa9766e7b281da3d4de819ca414d827b61de50..1285e1f3323ff096e366b918a0db30b0948a48bb 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -20,6 +20,7 @@ from paddle import framework import paddle from paddle.fluid import core from paddle.fluid.dygraph.parallel import _split_tensors, sync_params_buffers, build_groups +from paddle.fluid.framework import in_dygraph_mode, _in_legacy_dygraph from collections import OrderedDict from .log_util import logger @@ -58,6 +59,30 @@ def _apply_collective_grads(parameters, comm_group): _split_tensors(coalesced_grads_and_vars) +def _apply_collective_grads_eager(parameters, comm_group): + grad_var_set = set() + grad_vars = [] + + for param in parameters: + if param.trainable and (param._grad_ivar() is not None): + g_var = param._grad_ivar() + assert not g_var.is_sparse( + ), "Now, it doesn't support sparse parameters" + grad_vars.append(g_var) + assert g_var not in grad_var_set + grad_var_set.add(g_var) + + coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024) + + div_factor = 1.0 / comm_group.nranks + for coalesced_grad, _, _ in coalesced_grads_and_vars: + # need to div nranks + coalesced_grad.scale_(div_factor) + paddle.distributed.all_reduce(coalesced_grad, group=comm_group) + + _split_tensors(coalesced_grads_and_vars) + + def _broadcast_data_help(data, shape, dtype, hcg): model_parallel_group = hcg.get_model_parallel_group() src_rank = hcg.get_model_parallel_group_src_rank() @@ -115,10 +140,17 @@ def broadcast_dp_parameters(model, hcg): def fused_allreduce_gradients(parameter_list, hcg): - data_parallel_group = None if hcg is None else hcg.get_data_parallel_group() - logger.debug("dp start fuse allreduce gradients") - with framework.no_grad(): - _apply_collective_grads(parameter_list, data_parallel_group) + if _in_legacy_dygraph(): + data_parallel_group = None if hcg is None else hcg.get_data_parallel_group( + ) + logger.debug("dp start fuse allreduce gradients") + with framework.no_grad(): + _apply_collective_grads(parameter_list, data_parallel_group) + elif in_dygraph_mode(): + assert hcg is None, "It's not support to use hcg in EagerDygraph now." + data_parallel_group = paddle.distributed.collective._get_default_group() + with framework.no_grad(): + _apply_collective_grads_eager(parameter_list, data_parallel_group) def sharding_reduce_gradients(parameter_list, hcg): diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index ac15034ffb15c0e26c6d062cbb3572ee9b1cb1b9..e6b891cbe00bf9a5cc3457c1a6921d52f627a809 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -22,6 +22,7 @@ import warnings from contextlib import contextmanager import paddle +from paddle import _C_ops from paddle.fluid import core from paddle.fluid import framework from paddle.fluid.dygraph import layers @@ -307,17 +308,28 @@ def _reshape_inplace(x, shape): @framework.dygraph_only def _split_tensors(coalesced_grads_and_grad_vars): - for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars: - grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes] - framework._dygraph_tracer().trace_op( - type='split', - inputs={'X': coalesced_grad}, - outputs={'Out': origin_grad_vars}, - attrs={'sections': grad_var_len, - 'axis': 0}) - for g_var, g_shape in zip(origin_grad_vars, grad_shapes): - _reshape_inplace(x=g_var, shape=g_shape) - assert g_var.shape == g_shape + if _in_legacy_dygraph(): + for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars: + grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes] + framework._dygraph_tracer().trace_op( + type='split', + inputs={'X': coalesced_grad}, + outputs={'Out': origin_grad_vars}, + attrs={'sections': grad_var_len, + 'axis': 0}) + for g_var, g_shape in zip(origin_grad_vars, grad_shapes): + _reshape_inplace(x=g_var, shape=g_shape) + assert g_var.shape == g_shape + elif in_dygraph_mode(): + for coalesced_grad, origin_grad_vars, grad_shapes in coalesced_grads_and_grad_vars: + grad_var_len = [np.prod(g_shape) for g_shape in grad_shapes] + attrs = () + attrs += ('sections', grad_var_len) + attrs += ('axis', 0) + _C_ops.split(coalesced_grad, origin_grad_vars, *attrs) + for g_var, g_shape in zip(origin_grad_vars, grad_shapes): + g_var.reshape_(shape=g_shape) + assert g_var.shape == g_shape def scale_loss(loss): diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_dataparallel_with_pylayer.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_dataparallel_with_pylayer.py index f623ba36dcab56deeed32ac3f9e9723c530d16ed..8ce2275868b3952d998f22a3153f66ff3e450cd6 100644 --- a/python/paddle/fluid/tests/unittests/parallel_dygraph_dataparallel_with_pylayer.py +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_dataparallel_with_pylayer.py @@ -21,7 +21,8 @@ import paddle import numpy as np import paddle.distributed as dist from paddle.fluid.dygraph.nn import Linear -from paddle.autograd import PyLayer +from paddle.autograd import PyLayer, EagerPyLayer +from paddle.fluid.framework import in_dygraph_mode, _in_legacy_dygraph from paddle.distributed.fleet.utils.hybrid_parallel_util import fused_allreduce_gradients batch = 5 @@ -43,6 +44,20 @@ class cus_tanh(PyLayer): return grad +class cus_tanh_eager(EagerPyLayer): + @staticmethod + def forward(ctx, x): + y = paddle.tanh(x) + ctx.save_for_backward(y) + return y + + @staticmethod + def backward(ctx, dy): + y, = ctx.saved_tensor() + grad = dy * (1 - paddle.square(y)) + return grad + + class SimpleNet(paddle.nn.Layer): def __init__(self, train_id, model_id): super(SimpleNet, self).__init__() @@ -55,7 +70,10 @@ class SimpleNet(paddle.nn.Layer): def forward(self, inputs): if self.model_id == 0: - inputs = cus_tanh.apply(inputs) + if in_dygraph_mode(): + inputs = cus_tanh_eager.apply(inputs) + elif _in_legacy_dygraph(): + inputs = cus_tanh.apply(inputs) else: inputs = self.tanh(inputs) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py index d2e7949981f7fd4dd4325d1e2d2024c30fd24f23..59013236967dbb6568a9a62025f0170d2289fcd8 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py @@ -23,6 +23,7 @@ import os import subprocess from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.fluid.framework import _test_eager_guard def get_cluster_from_args(selected_gpus): @@ -205,6 +206,8 @@ class TestDataParallelGradientCheck(TestMultipleGpus): class TestDataParallelWithPyLayer(TestMultipleGpus): def test_parallel_dygraph_dataparallel_with_pylayer(self): + with _test_eager_guard(): + self.run_mnist_2gpu('parallel_dygraph_dataparallel_with_pylayer.py') self.run_mnist_2gpu('parallel_dygraph_dataparallel_with_pylayer.py') diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py index e461bf2a26f4170feaf491bd2bc393fbceb5be4b..56fcf806c471708e4c2964843a0625ea407e86d4 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py @@ -55,35 +55,5 @@ class TestParallelDygraphSparseEmdeddingFP64_GLOO(TestDistBase): log_name=flag_name) -class TestParallelDygraphSparseEmdeddingEager_GLOO(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._eager_mode = True - self._gloo_mode = True - self._dygraph = True - - def test_sparse_embedding(self): - self.check_with_place( - "parallel_dygraph_sparse_embedding.py", - delta=1e-5, - check_error_log=True, - log_name=flag_name) - - -class TestParallelDygraphSparseEmdeddingEagerFP64_GLOO(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._eager_mode = True - self._gloo_mode = True - self._dygraph = True - - def test_sparse_embedding_fp64(self): - self.check_with_place( - "parallel_dygraph_sparse_embedding_fp64.py", - delta=1e-5, - check_error_log=True, - log_name=flag_name) - - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py index 0acec54ca62b39455952335893963892cd3d0f5d..ba43e26e23a4ec8266d3b8b76ff7e7d24b9bef92 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py @@ -40,20 +40,5 @@ class TestParallelDygraphSparseEmdeddingOverHeight_GLOO(TestDistBase): log_name=flag_name) -class TestParallelDygraphSparseEmdeddingOverHeightEager_GLOO(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._eager_mode = True - self._gloo_mode = True - self._dygraph = True - - def test_sparse_embedding(self): - self.check_with_place( - "parallel_dygraph_sparse_embedding_over_height.py", - delta=1e-7, - check_error_log=True, - log_name=flag_name) - - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py index 6d4dd6433ae0352983fa824e8af1e958f6ec15c4..d3619cc1b9a00ab44123f6d2ed6eae52e795f312 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py @@ -57,20 +57,5 @@ class TestParallelDygraphTransformerAccGrad_GLOO(TestDistBase): log_name=flag_name) -class TestParallelDygraphTransformerEager_GLOO(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._eager_mode = True - self._gloo_mode = True - self._dygraph = True - - def test_transformer(self): - self.check_with_place( - "parallel_dygraph_transformer.py", - delta=1e-5, - check_error_log=True, - log_name=flag_name) - - if __name__ == "__main__": unittest.main()