From 298f210d3f92efe7d6cca107ec66beeeb2778f82 Mon Sep 17 00:00:00 2001 From: ShenLiang <1422485404@qq.com> Date: Tue, 11 May 2021 11:17:35 +0800 Subject: [PATCH] Support control flow in DataParallel (#32826) * fix find_unused_parameters default value --- .../framework/distributed_strategy.proto | 2 +- paddle/fluid/imperative/reducer.cc | 110 ++++++++++-------- paddle/fluid/imperative/reducer.h | 8 +- .../fleet/base/distributed_strategy.py | 2 +- python/paddle/fluid/dygraph/parallel.py | 15 +-- .../parallel_dygraph_gradient_check.py | 4 +- .../tests/unittests/spawn_runner_base.py | 1 + .../fluid/tests/unittests/test_dist_base.py | 11 +- .../test_parallel_dygraph_control_flow.py | 6 + .../unittests/test_parallel_dygraph_mnist.py | 1 + 10 files changed, 95 insertions(+), 65 deletions(-) diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index dbe9b8cb9a..d102fcdbe0 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -172,7 +172,7 @@ message DistributedStrategy { optional bool fp16_allreduce = 25 [ default = false ]; optional bool sharding = 26 [ default = false ]; optional float last_comm_group_size_MB = 27 [ default = 1 ]; - optional bool find_unused_parameters = 28 [ default = true ]; + optional bool find_unused_parameters = 28 [ default = false ]; optional bool tensor_parallel = 29 [ default = false ]; optional bool without_graph_optimization = 30 [ default = false ]; diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index e3dd0a2aa7..0f6676ed48 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -297,7 +297,7 @@ Reducer::Reducer(const std::vector> &vars, is_sparse_gradient_(is_sparse_gradient), parallel_ctx_(parallel_ctx), group_size_limits_(group_size_limits), - find_unused_vars_(find_unused_vars) { + find_unused_vars_each_step_(find_unused_vars) { VLOG(3) << "Start construct the Reducer ..."; nrings_ = parallel_ctx->GetNRings(); nranks_ = parallel_ctx->GetNRanks(); @@ -457,42 +457,8 @@ void Reducer::PrepareDeps(const std::unordered_set &init_nodes) { } } -// After each batch is calculated, the counter of each group(group.pending_) -// and allreudce sequence counter(next_group_) will be cleaned up again. -void Reducer::PrepareForBackward( +void Reducer::TraverseBackwardGraph( const std::vector> &outputs) { - VLOG(3) << "after forward, then reset count for backward."; - next_group_ = 0; - std::for_each(groups_.begin(), groups_.end(), [](Group &group) { - group.pending_ = group.variable_indices_.size(); - group.sparse_contents_ = nullptr; - }); - - // reinitialize vars_marked_ready_ for next iteration - vars_marked_ready_.clear(); - vars_marked_ready_.resize(vars_.size(), false); - - PADDLE_ENFORCE_EQ( - groups_need_finalize_, false, - platform::errors::PreconditionNotMet( - "A serious error has occurred here. There may be several reasons: " - "1) Please note that all forward outputs derived from the module " - "parameters must participate in the calculation of losses and " - "subsequent gradient calculations. If not, the wrapper will hang, " - "waiting for autograd to generate gradients for these parameters. " - "you can use detach or stop_gradient to make the unused parameters " - "detached from the autograd graph. " - "2) Used multiple forwards and one backward. You may be able to wrap " - "multiple forwards in a model.")); - - // The first var to trigger the unused parameter - has_marked_unused_vars_ = false; - unused_vars_.clear(); - - if (!find_unused_vars_) { - return; - } - node_deps_.clear(); std::queue> q; std::unordered_set var_visited; @@ -554,8 +520,50 @@ void Reducer::PrepareForBackward( << "] is not used"; } } +} - if (unused_vars_.empty()) { +// After each batch is calculated, the counter of each group(group.pending_) +// and allreudce sequence counter(next_group_) will be cleaned up again. +void Reducer::PrepareForBackward( + const std::vector> &outputs) { + VLOG(3) << "after forward, then reset count for backward."; + next_group_ = 0; + std::for_each(groups_.begin(), groups_.end(), [](Group &group) { + group.pending_ = group.variable_indices_.size(); + group.sparse_contents_ = nullptr; + }); + + // reinitialize vars_marked_ready_ for next iteration + vars_marked_ready_.clear(); + vars_marked_ready_.resize(vars_.size(), false); + + PADDLE_ENFORCE_EQ( + groups_need_finalize_, false, + platform::errors::PreconditionNotMet( + "A serious error has occurred here. Please " + "set find_unused_parameters=True to traverse backward graph " + "in each step to prepare reduce in advance. If you have " + "set, There may be several reasons for this error: " + "1) Please note that all forward outputs derived from the module " + "parameters must participate in the calculation of losses and " + "subsequent gradient calculations. If not, the wrapper will hang, " + "waiting for autograd to generate gradients for these parameters. " + "you can use detach or stop_gradient to make the unused parameters " + "detached from the autograd graph. " + "2) Used multiple forwards and one backward. You may be able to wrap " + "multiple forwards in a model.")); + + // The first var to trigger the unused parameter + has_marked_unused_vars_ = false; + + if (find_unused_vars_once_ || find_unused_vars_each_step_) { + unused_vars_.clear(); + TraverseBackwardGraph(outputs); + // only check once in first step + find_unused_vars_once_ = false; + } + + if (find_unused_vars_each_step_ && unused_vars_.empty()) { LOG_FIRST_N(WARNING, 1) << "All parameters are involved in the backward pass. " "It is recommended to set find_unused_parameters to False " @@ -564,7 +572,9 @@ void Reducer::PrepareForBackward( "will occur. Please make it clear that in the subsequent " "training, there will be no parameters that are not used " "in the backward pass, and then set find_unused_parameters"; - } else if (unused_vars_.size() == vars_.size()) { + } + + if (unused_vars_.size() == vars_.size()) { LOG_FIRST_N(WARNING, 1) << "There is no parameter in the device involved " "in the backward calculation. If there are " @@ -595,13 +605,13 @@ void Reducer::AddDistHook(size_t var_index) { local_used_vars_[var_index] = 1; - // rebuild group when find_unused_vars_ is false + // rebuild group when find_unused_vars_each_step_ is false if (NeedRebuildGroup()) { rebuild_vars_.push_back(vars_[var_index]); rebuild_var_indices_.push_back(var_index); } - if (!has_marked_unused_vars_ && find_unused_vars_) { + if (!has_marked_unused_vars_) { has_marked_unused_vars_ = true; for (const auto &unused_index : unused_vars_) { MarkVarReady(unused_index, false); @@ -622,7 +632,9 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { if (vars_marked_ready_[var_index]) { auto error_info = string::Sprintf( "Error happened, when parameter[%d][%s] has been ready before. " - "There may be several reasons for this error: " + "Please set find_unused_parameters=True to traverse backward graph " + "in each step to prepare reduce in advance. If you have set, " + "there may be several reasons for this error: " "1) In multiple reentrant backward phase, some parameters are reused." "2) Using model parameters outside of forward function. Please " "make sure that model parameters are not shared in concurrent " @@ -690,10 +702,16 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { } } else { // process sparse group - PADDLE_ENFORCE_EQ(HasGrad(var_index), true, - platform::errors::PreconditionNotMet( - "The sparse parameter[%d][%s] must have a gradient", - var_index, vars_[var_index]->Name())); + PADDLE_ENFORCE_EQ( + HasGrad(var_index), true, + platform::errors::PreconditionNotMet( + "The sparse parameter[%d][%s] should have gradient. " + "Currently, DataParallel does not support sparse " + "parameters without generating gradients during training. " + "For example, if is_sparese=True is used in Embedding, " + "the current step of this parameter cannot generate gradient " + "because of stop_gradient/detatch, where error will occur.", + var_index, vars_[var_index]->Name())); auto var_base = vars_[var_index]->GradVarBase(); // need to check tensor type PADDLE_ENFORCE_EQ( @@ -943,7 +961,7 @@ void Reducer::FinalizeBackward() { InitializeGroups(group_indices_); } - if (find_unused_vars_) { + if (find_unused_vars_each_step_) { // TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) ProcessUnusedDenseVars(); diff --git a/paddle/fluid/imperative/reducer.h b/paddle/fluid/imperative/reducer.h index 0d613dbea8..8392ab2c70 100644 --- a/paddle/fluid/imperative/reducer.h +++ b/paddle/fluid/imperative/reducer.h @@ -162,13 +162,16 @@ class Reducer { std::vector> RebuildGruops(); inline bool NeedRebuildGroup() { - return !has_rebuilt_group_ && !find_unused_vars_; + return !has_rebuilt_group_ && !find_unused_vars_each_step_; } void ProcessUnusedDenseVars(); bool HasGrad(size_t var_index); + void TraverseBackwardGraph( + const std::vector>& outputs); + private: std::vector> vars_; std::vector> group_indices_; @@ -195,7 +198,8 @@ class Reducer { std::unordered_map var_index_map_; std::vector unused_vars_; bool has_marked_unused_vars_{false}; - bool find_unused_vars_{false}; + bool find_unused_vars_each_step_{false}; + bool find_unused_vars_once_{true}; bool groups_need_finalize_{false}; #ifdef PADDLE_WITH_XPU_BKCL // comm_pool_ is used for scheduling allreduce in multi Kunlun cards training. diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 469b45d200..122ef4357a 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -626,7 +626,7 @@ class DistributedStrategy(object): Indicating whether we are using find_unused_parameters to find unused parameters in DataParallel. - Default value: True + Default value: False Examples: diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index ca5e5606e4..2be062962e 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -417,14 +417,15 @@ class DataParallel(layers.Layer): Note that setting the find_unused_parameters to True will affect computing performance. Therefore, if all parameters are sure to participate in the loss calculation and the - autograd graph construction, please set it False. Default: True. + autograd graph construction, please set it False. Default: False. Returns: Layer: The data paralleled module. Examples: .. code-block:: python - + + # required: distributed import paddle import paddle.nn as nn import paddle.optimizer as opt @@ -474,7 +475,7 @@ class DataParallel(layers.Layer): strategy=None, comm_buffer_size=25, last_comm_buffer_size=1, - find_unused_parameters=True): + find_unused_parameters=False): super(DataParallel, self).__init__(layers.full_name() + "_data_parallel") @@ -576,12 +577,8 @@ class DataParallel(layers.Layer): def forward(self, *inputs, **kwargs): outputs = self._layers(*inputs, **kwargs) if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad: - if self.find_unused_parameters: - self._reducer.prepare_for_backward( - list(self._find_varbase(outputs))) - else: - self._reducer.prepare_for_backward(list(self._find_varbase([]))) - + self._reducer.prepare_for_backward( + list(self._find_varbase(outputs))) return outputs @deprecated( diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py index 7002352240..5c518976d1 100644 --- a/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py @@ -74,8 +74,8 @@ class TestDistTraning(unittest.TestCase): state_dict = model_a.state_dict() model_b.set_state_dict(state_dict) - model_a = paddle.DataParallel(model_a) - model_b = paddle.DataParallel(model_b) + model_a = paddle.DataParallel(model_a, find_unused_parameters=True) + model_b = paddle.DataParallel(model_b, find_unused_parameters=True) ones_input = paddle.ones(shape=(batch, in_dim)) ones_input.stop_gradient = True diff --git a/python/paddle/fluid/tests/unittests/spawn_runner_base.py b/python/paddle/fluid/tests/unittests/spawn_runner_base.py index 278d7b27c5..2719e28fea 100644 --- a/python/paddle/fluid/tests/unittests/spawn_runner_base.py +++ b/python/paddle/fluid/tests/unittests/spawn_runner_base.py @@ -27,6 +27,7 @@ from test_dist_base import RUN_STEP class SpawnAssistTestArgs(object): update_method = "local" trainer_id = 0 + find_unused_parameters = False class TestDistSpawnRunner(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 3749429441..edc510e4e7 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -548,7 +548,10 @@ class TestParallelDyGraphRunnerBase(object): # 4. train model model, train_reader, opt = self.get_model() if args.update_method == "nccl2": - model = paddle.DataParallel(model) + if args.find_unused_parameters: + model = paddle.DataParallel(model, find_unused_parameters=True) + else: + model = paddle.DataParallel(model, find_unused_parameters=False) out_losses = [] for step_id, data in enumerate(train_reader()): @@ -581,8 +584,8 @@ class TestParallelDyGraphRunnerBase(object): # set strategy strategy = fleet.DistributedStrategy() - if not args.find_unused_parameters: - strategy.find_unused_parameters = False + if args.find_unused_parameters: + strategy.find_unused_parameters = True # 3. init parallel env if args.update_method == "nccl2" or "bkcl": @@ -737,7 +740,7 @@ class TestDistBase(unittest.TestCase): self._save_model = False self._fuse_all_reduce = None self._accumulate_gradient = False - self._find_unused_parameters = True + self._find_unused_parameters = False self._setup_config() global DIST_UT_PORT diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_control_flow.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_control_flow.py index fa571bde5e..3c45b2c795 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_control_flow.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_control_flow.py @@ -30,6 +30,7 @@ class TestDygraphControlFlowSame(TestDistBase): self._sync_mode = False self._nccl2_mode = True self._dygraph = True + self._find_unused_parameters = True def test_net(self): if fluid.core.is_compiled_with_cuda(): @@ -46,6 +47,7 @@ class TestFleetDygraphControlFlowSame(TestDygraphControlFlowSame): self._nccl2_mode = True self._dygraph = True self._use_fleet_api = True + self._find_unused_parameters = True class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame): @@ -54,6 +56,7 @@ class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame): self._nccl2_mode = True self._dygraph = True self._accumulate_gradient = True + self._find_unused_parameters = True class TestDygraphControlFlowDiff(TestDistBase): @@ -61,6 +64,7 @@ class TestDygraphControlFlowDiff(TestDistBase): self._sync_mode = False self._nccl2_mode = True self._dygraph = True + self._find_unused_parameters = True def test_net(self): if fluid.core.is_compiled_with_cuda(): @@ -77,6 +81,7 @@ class TestFleetDygraphControlFlowDiff(TestDygraphControlFlowDiff): self._nccl2_mode = True self._dygraph = True self._use_fleet_api = True + self._find_unused_parameters = True class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff): @@ -85,6 +90,7 @@ class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff): self._nccl2_mode = True self._dygraph = True self._accumulate_gradient = True + self._find_unused_parameters = True if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py index 782d230461..0c55e13572 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_mnist.py @@ -31,6 +31,7 @@ class TestParallelDygraphMnist(TestDistBase): self._sync_mode = False self._nccl2_mode = True self._dygraph = True + self._find_unused_parameters = True def test_mnist(self): if fluid.core.is_compiled_with_cuda(): -- GitLab