diff --git a/paddle/fluid/distributed/collective/reducer.cc b/paddle/fluid/distributed/collective/reducer.cc index 2c26828e5e1143382173a87f966d7fbec1a96c79..f04585ce1710f45201c3fadb87d321f72ecf11b5 100644 --- a/paddle/fluid/distributed/collective/reducer.cc +++ b/paddle/fluid/distributed/collective/reducer.cc @@ -588,10 +588,9 @@ void EagerReducer::TraverseBackwardGraph(const std::vector &outputs) { } } -void EagerReducer::PrepareForBackward(const std::vector &outputs, - const bool is_sync) { +void EagerReducer::PrepareForBackward(const std::vector &outputs) { VLOG(3) << "after forward, then reset count for backward."; - grad_need_hooks_ = is_sync; + grad_need_hooks_ = true; next_group_ = 0; std::for_each(groups_.begin(), groups_.end(), [](EagerGroup &group) { @@ -660,9 +659,25 @@ void EagerReducer::AddDistHook(size_t var_index) { var_index)); // gradient synchronization is not required when grad_need_hooks_ is false. - // if (!grad_need_hooks_) { - // return; - // } + if (!grad_need_hooks_) { + const auto &var_locator = variable_locators_[var_index]; + const auto group_index = var_locator.group_index; + const auto inside_group_index = var_locator.inside_group_index; + auto &group = groups_[group_index]; + auto &group_tensor = group.dense_tensors_[inside_group_index]; + + auto *autograd_meta = tensors_[var_index].get_autograd_meta(); + auto &grad_tensor = static_cast(autograd_meta)->Grad(); + + if (!HasGrad(var_index)) { + group_tensor.ShareDataWith(phi::DenseTensor()); + } else { + auto grad_dense_tensor = + *(std::dynamic_pointer_cast(grad_tensor.impl())); + group_tensor.ShareDataWith(grad_dense_tensor); + } + return; + } VLOG(3) << "Tensor[" << var_index << "] [" << tensors_[var_index].name() << "@Grad] arrived and triggered disthook"; @@ -828,12 +843,10 @@ void EagerReducer::MarkGroupReady(size_t group_index) { for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0; ++next_group_) { UNUSED auto &group = groups_[next_group_]; - if (grad_need_hooks_) { - if (group.is_sparse_) { - AllReduceSparse(&group, next_group_); - } else { - FusedAllReduceSchedule(&group, next_group_); - } + if (group.is_sparse_) { + AllReduceSparse(&group, next_group_); + } else { + FusedAllReduceSchedule(&group, next_group_); } } } @@ -921,14 +934,15 @@ void EagerReducer::ProcessUnusedDenseVars() { void EagerReducer::FinalizeBackward() { groups_need_finalize_ = false; + grad_need_hooks_ = false; for (auto &group : groups_) { - if (!group.is_sparse_ && grad_need_hooks_) { + if (!group.is_sparse_) { group.task->Synchronize(); } } for (auto &group : groups_) { - if (!group.is_sparse_ && grad_need_hooks_) { + if (!group.is_sparse_) { group.dense_contents_.reset(); } } @@ -940,7 +954,6 @@ void EagerReducer::FinalizeBackward() { VLOG(3) << "ProcessUnusedDenseVars is finished."; } - grad_need_hooks_ = false; VLOG(3) << "In the batch, Reducer is finished."; } diff --git a/paddle/fluid/distributed/collective/reducer.h b/paddle/fluid/distributed/collective/reducer.h index 74db3db7467298d26d982d3c0cedc4b8b2327705..5d27086fdbec502e1a0686fc0feb8651d8c163ac 100644 --- a/paddle/fluid/distributed/collective/reducer.h +++ b/paddle/fluid/distributed/collective/reducer.h @@ -103,8 +103,7 @@ class EagerReducer { void InitializeGroups(const std::vector> &group_indices); void InitializeDenseGroups(const std::vector &tensor_indices_, EagerGroup *p_group); - void PrepareForBackward(const std::vector &outputs, - const bool is_sync); + void PrepareForBackward(const std::vector &outputs); void AddDistHook(size_t var_index); void MarkVarReady(const size_t var_index, const bool is_used_var); void MarkGroupReady(const size_t group_index); diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index 3225222f617373a5a89656ccc4fd6d25d3557e86..f89fe234c201ab73d0262396b9f6b65313f868a1 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -675,10 +675,9 @@ void Reducer::TraverseBackwardGraph( // After each batch is calculated, the counter of each group(group.pending_) // and allreudce sequence counter(next_group_) will be cleaned up again. void Reducer::PrepareForBackward( - const std::vector> &outputs, - const bool is_sync) { + const std::vector> &outputs) { VLOG(3) << "after forward, then reset count for backward."; - grad_need_hooks_ = is_sync; + grad_need_hooks_ = true; next_group_ = 0; std::for_each(groups_.begin(), groups_.end(), [](Group &group) { group.pending_ = group.variable_indices_.size(); @@ -711,9 +710,7 @@ void Reducer::PrepareForBackward( if (find_unused_vars_once_ || find_unused_vars_each_step_) { unused_vars_.clear(); - if (grad_need_hooks_) { - TraverseBackwardGraph(outputs); - } + TraverseBackwardGraph(outputs); // only check once in first step find_unused_vars_once_ = false; } diff --git a/paddle/fluid/imperative/reducer.h b/paddle/fluid/imperative/reducer.h index 902c3036acc78b1d575f6d4a420682168e9fb000..c455f962788b891e2a039db2990437e5c51302f6 100644 --- a/paddle/fluid/imperative/reducer.h +++ b/paddle/fluid/imperative/reducer.h @@ -146,8 +146,7 @@ class Reducer { void PrepareDeps(const std::unordered_set& init_nodes); void PrepareForBackward( - const std::vector>& outputs, - const bool is_sync); + const std::vector>& outputs); void AddDistHook(size_t var_index); diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index fe1d82c766a0e0ad6a4f4e67dfdb1aaf6a931f5c..b84dd7fcbe1bb5f5050e2fb52f15c55069d02bc1 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -1407,14 +1407,11 @@ void BindDistributed(py::module *m) { .def(py::init(&CreateEagerReducer)) .def( "prepare_for_backward", - [](distributed::EagerReducer &self, - py::handle py_tensors, - bool is_sync) { + [](distributed::EagerReducer &self, py::handle py_tensors) { auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); - self.PrepareForBackward(params, is_sync); + self.PrepareForBackward(params); }, py::arg("tensors"), - py::arg("is_sync"), py::call_guard()); } diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index bd18d4b3319b2e8cf8f435fd29413b2b1f19e73b..1eb5f8bd4764c52577ff1d3960c6b47c37dd48c4 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -2569,7 +2569,6 @@ void BindImperative(py::module *m_ptr) { .def("prepare_for_backward", &imperative::Reducer::PrepareForBackward, py::arg("vars"), - py::arg("is_sync"), py::call_guard()); m.def("assign_group_by_size", diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 004c21c1346b1493277228ec051e7c79262b7658..51e0527e4fa99fde78696fc6714cc15b1a6cbcb7 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -818,9 +818,13 @@ class DataParallel(layers.Layer): def forward(self, *inputs, **kwargs): outputs = self._layers(*inputs, **kwargs) - if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad: + if ( + self._strategy.nranks > 1 + and framework._dygraph_tracer()._has_grad + and self.grad_need_sync + ): self._reducer.prepare_for_backward( - list(self._find_varbase(outputs)), self.grad_need_sync + list(self._find_varbase(outputs)) ) return outputs