未验证 提交 2922985a 编写于 作者: H Haohongxiang 提交者: GitHub

[Dygraph] Fix bugs of EagerReducer for complex control flows (#43252)

* fix bugs of reducer

* update

* update
上级 42dd0f1b
...@@ -775,6 +775,13 @@ void EagerReducer::ProcessUnusedDenseVars() { ...@@ -775,6 +775,13 @@ void EagerReducer::ProcessUnusedDenseVars() {
continue; continue;
} }
// NOTE(haohongxiang): Calling SetFakeEmpty here is to make sure that
// gradient accumulation can continue normally after clear_gradients()
// especiall in cases including complex control flow.
std::static_pointer_cast<egr::GradNodeAccumulation>(
GetGradNodeFromTensor(&tensors_[var_index]))
->SetFakeEmpty(false);
Tensor grad_value(std::make_shared<phi::DenseTensor>(src_tensor)); Tensor grad_value(std::make_shared<phi::DenseTensor>(src_tensor));
auto dest_var_base = tensors_[var_index]; auto dest_var_base = tensors_[var_index];
......
...@@ -43,12 +43,11 @@ def _apply_collective_grads(parameters, comm_group): ...@@ -43,12 +43,11 @@ def _apply_collective_grads(parameters, comm_group):
coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024) coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024)
for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks
nranks = paddle.distributed.get_world_size( nranks = paddle.distributed.get_world_size(
) if comm_group is None else comm_group.nranks ) if comm_group is None else comm_group.nranks
for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks
div_factor = paddle.to_tensor(nranks, dtype=coalesced_grad.dtype) div_factor = paddle.to_tensor(nranks, dtype=coalesced_grad.dtype)
paddle.distributed.all_reduce(coalesced_grad, group=comm_group)
paddle.fluid.framework._dygraph_tracer().trace_op( paddle.fluid.framework._dygraph_tracer().trace_op(
type="elementwise_div", type="elementwise_div",
inputs={ inputs={
...@@ -57,6 +56,7 @@ def _apply_collective_grads(parameters, comm_group): ...@@ -57,6 +56,7 @@ def _apply_collective_grads(parameters, comm_group):
}, },
outputs={'Out': coalesced_grad}, outputs={'Out': coalesced_grad},
attrs={'axis': -1}) attrs={'axis': -1})
paddle.distributed.all_reduce(coalesced_grad, group=comm_group)
_split_tensors(coalesced_grads_and_vars) _split_tensors(coalesced_grads_and_vars)
...@@ -76,10 +76,11 @@ def _apply_collective_grads_eager(parameters, comm_group): ...@@ -76,10 +76,11 @@ def _apply_collective_grads_eager(parameters, comm_group):
coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024) coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024)
div_factor = 1.0 / comm_group.nranks nranks = paddle.distributed.get_world_size(
) if comm_group is None else comm_group.nranks
for coalesced_grad, _, _ in coalesced_grads_and_vars: for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks # need to div nranks
coalesced_grad.scale_(div_factor) coalesced_grad.scale_(1.0 / nranks)
paddle.distributed.all_reduce(coalesced_grad, group=comm_group) paddle.distributed.all_reduce(coalesced_grad, group=comm_group)
_split_tensors(coalesced_grads_and_vars) _split_tensors(coalesced_grads_and_vars)
......
...@@ -1507,7 +1507,7 @@ if(WITH_DISTRIBUTE ...@@ -1507,7 +1507,7 @@ if(WITH_DISTRIBUTE
350) 350)
set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 300) set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 300)
set_tests_properties(test_parallel_dygraph_no_sync_gradient_check set_tests_properties(test_parallel_dygraph_no_sync_gradient_check
PROPERTIES TIMEOUT 30) PROPERTIES TIMEOUT 60)
set_tests_properties(test_parallel_dygraph_pipeline_parallel set_tests_properties(test_parallel_dygraph_pipeline_parallel
PROPERTIES TIMEOUT 500) PROPERTIES TIMEOUT 500)
set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT
......
...@@ -200,7 +200,8 @@ class TestMultipleWithGloo(unittest.TestCase): ...@@ -200,7 +200,8 @@ class TestMultipleWithGloo(unittest.TestCase):
class TestDataParallelGradientCheck(TestMultipleGpus): class TestDataParallelGradientCheck(TestMultipleGpus):
def test_multiple_gpus_dynamic(self): def test_multiple_gpus_dynamic(self):
self.run_mnist_2gpu('parallel_dygraph_gradient_check.py') self.run_mnist_2gpu('parallel_dygraph_gradient_check.py',
eager_mode=False)
class TestDataParallelWithPyLayer(TestMultipleGpus): class TestDataParallelWithPyLayer(TestMultipleGpus):
...@@ -218,4 +219,5 @@ class TestGradientCheckInEagerMode(TestMultipleGpus): ...@@ -218,4 +219,5 @@ class TestGradientCheckInEagerMode(TestMultipleGpus):
if __name__ == "__main__": if __name__ == "__main__":
os.environ["FLAGS_enable_eager_mode"] = "1"
unittest.main() unittest.main()
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import os
import unittest import unittest
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -24,7 +25,10 @@ class TestDataParallelLayer(TestMultipleGpus): ...@@ -24,7 +25,10 @@ class TestDataParallelLayer(TestMultipleGpus):
def test_parallel_dygraph_dataparallel_no_sync(self): def test_parallel_dygraph_dataparallel_no_sync(self):
self.run_mnist_2gpu('parallel_dygraph_no_sync_gradient_check.py') self.run_mnist_2gpu('parallel_dygraph_no_sync_gradient_check.py')
self.run_mnist_2gpu('parallel_dygraph_no_sync_gradient_check.py',
eager_mode=False)
if __name__ == "__main__": if __name__ == "__main__":
os.environ["FLAGS_enable_eager_mode"] = "1"
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册