未验证 提交 02513207 编写于 作者: S ShenLiang 提交者: GitHub

fix find_unused_parameters default value (#32829)

fix error log for reducer

fix doc

fix bug of utest

fix spawn

fix converage
上级 09b18a49
...@@ -172,7 +172,7 @@ message DistributedStrategy { ...@@ -172,7 +172,7 @@ message DistributedStrategy {
optional bool fp16_allreduce = 25 [ default = false ]; optional bool fp16_allreduce = 25 [ default = false ];
optional bool sharding = 26 [ default = false ]; optional bool sharding = 26 [ default = false ];
optional float last_comm_group_size_MB = 27 [ default = 1 ]; optional float last_comm_group_size_MB = 27 [ default = 1 ];
optional bool find_unused_parameters = 28 [ default = true ]; optional bool find_unused_parameters = 28 [ default = false ];
optional bool tensor_parallel = 29 [ default = false ]; optional bool tensor_parallel = 29 [ default = false ];
optional RecomputeConfig recompute_configs = 101; optional RecomputeConfig recompute_configs = 101;
......
...@@ -297,7 +297,7 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars, ...@@ -297,7 +297,7 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
is_sparse_gradient_(is_sparse_gradient), is_sparse_gradient_(is_sparse_gradient),
parallel_ctx_(parallel_ctx), parallel_ctx_(parallel_ctx),
group_size_limits_(group_size_limits), group_size_limits_(group_size_limits),
find_unused_vars_(find_unused_vars) { find_unused_vars_each_step_(find_unused_vars) {
VLOG(3) << "Start construct the Reducer ..."; VLOG(3) << "Start construct the Reducer ...";
nrings_ = parallel_ctx->GetNRings(); nrings_ = parallel_ctx->GetNRings();
nranks_ = parallel_ctx->GetNRanks(); nranks_ = parallel_ctx->GetNRanks();
...@@ -457,42 +457,8 @@ void Reducer::PrepareDeps(const std::unordered_set<GradOpNode *> &init_nodes) { ...@@ -457,42 +457,8 @@ void Reducer::PrepareDeps(const std::unordered_set<GradOpNode *> &init_nodes) {
} }
} }
// After each batch is calculated, the counter of each group(group.pending_) void Reducer::TraverseBackwardGraph(
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) { const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});
// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);
PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. There may be several reasons: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));
// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;
unused_vars_.clear();
if (!find_unused_vars_) {
return;
}
node_deps_.clear(); node_deps_.clear();
std::queue<std::shared_ptr<GradOpNode>> q; std::queue<std::shared_ptr<GradOpNode>> q;
std::unordered_set<VariableWrapper *> var_visited; std::unordered_set<VariableWrapper *> var_visited;
...@@ -554,8 +520,50 @@ void Reducer::PrepareForBackward( ...@@ -554,8 +520,50 @@ void Reducer::PrepareForBackward(
<< "] is not used"; << "] is not used";
} }
} }
}
if (unused_vars_.empty()) { // After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});
// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);
PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. Please "
"set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have "
"set, There may be several reasons for this error: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));
// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;
if (find_unused_vars_once_ || find_unused_vars_each_step_) {
unused_vars_.clear();
TraverseBackwardGraph(outputs);
// only check once in first step
find_unused_vars_once_ = false;
}
if (find_unused_vars_each_step_ && unused_vars_.empty()) {
LOG_FIRST_N(WARNING, 1) LOG_FIRST_N(WARNING, 1)
<< "All parameters are involved in the backward pass. " << "All parameters are involved in the backward pass. "
"It is recommended to set find_unused_parameters to False " "It is recommended to set find_unused_parameters to False "
...@@ -564,7 +572,9 @@ void Reducer::PrepareForBackward( ...@@ -564,7 +572,9 @@ void Reducer::PrepareForBackward(
"will occur. Please make it clear that in the subsequent " "will occur. Please make it clear that in the subsequent "
"training, there will be no parameters that are not used " "training, there will be no parameters that are not used "
"in the backward pass, and then set find_unused_parameters"; "in the backward pass, and then set find_unused_parameters";
} else if (unused_vars_.size() == vars_.size()) { }
if (unused_vars_.size() == vars_.size()) {
LOG_FIRST_N(WARNING, 1) LOG_FIRST_N(WARNING, 1)
<< "There is no parameter in the device involved " << "There is no parameter in the device involved "
"in the backward calculation. If there are " "in the backward calculation. If there are "
...@@ -595,13 +605,13 @@ void Reducer::AddDistHook(size_t var_index) { ...@@ -595,13 +605,13 @@ void Reducer::AddDistHook(size_t var_index) {
local_used_vars_[var_index] = 1; local_used_vars_[var_index] = 1;
// rebuild group when find_unused_vars_ is false // rebuild group when find_unused_vars_each_step_ is false
if (NeedRebuildGroup()) { if (NeedRebuildGroup()) {
rebuild_vars_.push_back(vars_[var_index]); rebuild_vars_.push_back(vars_[var_index]);
rebuild_var_indices_.push_back(var_index); rebuild_var_indices_.push_back(var_index);
} }
if (!has_marked_unused_vars_ && find_unused_vars_) { if (!has_marked_unused_vars_) {
has_marked_unused_vars_ = true; has_marked_unused_vars_ = true;
for (const auto &unused_index : unused_vars_) { for (const auto &unused_index : unused_vars_) {
MarkVarReady(unused_index, false); MarkVarReady(unused_index, false);
...@@ -622,7 +632,9 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { ...@@ -622,7 +632,9 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
if (vars_marked_ready_[var_index]) { if (vars_marked_ready_[var_index]) {
auto error_info = string::Sprintf( auto error_info = string::Sprintf(
"Error happened, when parameter[%d][%s] has been ready before. " "Error happened, when parameter[%d][%s] has been ready before. "
"There may be several reasons for this error: " "Please set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have set, "
"there may be several reasons for this error: "
"1) In multiple reentrant backward phase, some parameters are reused." "1) In multiple reentrant backward phase, some parameters are reused."
"2) Using model parameters outside of forward function. Please " "2) Using model parameters outside of forward function. Please "
"make sure that model parameters are not shared in concurrent " "make sure that model parameters are not shared in concurrent "
...@@ -690,10 +702,16 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { ...@@ -690,10 +702,16 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
} }
} else { } else {
// process sparse group // process sparse group
PADDLE_ENFORCE_EQ(HasGrad(var_index), true, PADDLE_ENFORCE_EQ(
platform::errors::PreconditionNotMet( HasGrad(var_index), true,
"The sparse parameter[%d][%s] must have a gradient", platform::errors::PreconditionNotMet(
var_index, vars_[var_index]->Name())); "The sparse parameter[%d][%s] should have gradient. "
"Currently, DataParallel does not support sparse "
"parameters without generating gradients during training. "
"For example, if is_sparese=True is used in Embedding, "
"the current step of this parameter cannot generate gradient "
"because of stop_gradient/detatch, where error will occur.",
var_index, vars_[var_index]->Name()));
auto var_base = vars_[var_index]->GradVarBase(); auto var_base = vars_[var_index]->GradVarBase();
// need to check tensor type // need to check tensor type
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
...@@ -943,7 +961,7 @@ void Reducer::FinalizeBackward() { ...@@ -943,7 +961,7 @@ void Reducer::FinalizeBackward() {
InitializeGroups(group_indices_); InitializeGroups(group_indices_);
} }
if (find_unused_vars_) { if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector // TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
ProcessUnusedDenseVars(); ProcessUnusedDenseVars();
......
...@@ -162,13 +162,16 @@ class Reducer { ...@@ -162,13 +162,16 @@ class Reducer {
std::vector<std::vector<size_t>> RebuildGruops(); std::vector<std::vector<size_t>> RebuildGruops();
inline bool NeedRebuildGroup() { inline bool NeedRebuildGroup() {
return !has_rebuilt_group_ && !find_unused_vars_; return !has_rebuilt_group_ && !find_unused_vars_each_step_;
} }
void ProcessUnusedDenseVars(); void ProcessUnusedDenseVars();
bool HasGrad(size_t var_index); bool HasGrad(size_t var_index);
void TraverseBackwardGraph(
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs);
private: private:
std::vector<std::shared_ptr<imperative::VarBase>> vars_; std::vector<std::shared_ptr<imperative::VarBase>> vars_;
std::vector<std::vector<size_t>> group_indices_; std::vector<std::vector<size_t>> group_indices_;
...@@ -195,7 +198,8 @@ class Reducer { ...@@ -195,7 +198,8 @@ class Reducer {
std::unordered_map<VariableWrapper*, size_t> var_index_map_; std::unordered_map<VariableWrapper*, size_t> var_index_map_;
std::vector<size_t> unused_vars_; std::vector<size_t> unused_vars_;
bool has_marked_unused_vars_{false}; bool has_marked_unused_vars_{false};
bool find_unused_vars_{false}; bool find_unused_vars_each_step_{false};
bool find_unused_vars_once_{true};
bool groups_need_finalize_{false}; bool groups_need_finalize_{false};
#ifdef PADDLE_WITH_XPU_BKCL #ifdef PADDLE_WITH_XPU_BKCL
// comm_pool_ is used for scheduling allreduce in multi Kunlun cards training. // comm_pool_ is used for scheduling allreduce in multi Kunlun cards training.
......
...@@ -626,7 +626,7 @@ class DistributedStrategy(object): ...@@ -626,7 +626,7 @@ class DistributedStrategy(object):
Indicating whether we are using find_unused_parameters to Indicating whether we are using find_unused_parameters to
find unused parameters in DataParallel. find unused parameters in DataParallel.
Default value: True Default value: False
Examples: Examples:
......
...@@ -417,14 +417,15 @@ class DataParallel(layers.Layer): ...@@ -417,14 +417,15 @@ class DataParallel(layers.Layer):
Note that setting the find_unused_parameters to True Note that setting the find_unused_parameters to True
will affect computing performance. Therefore, if all parameters will affect computing performance. Therefore, if all parameters
are sure to participate in the loss calculation and the are sure to participate in the loss calculation and the
autograd graph construction, please set it False. Default: True. autograd graph construction, please set it False. Default: False.
Returns: Returns:
Layer: The data paralleled module. Layer: The data paralleled module.
Examples: Examples:
.. code-block:: python .. code-block:: python
# required: distributed
import paddle import paddle
import paddle.nn as nn import paddle.nn as nn
import paddle.optimizer as opt import paddle.optimizer as opt
...@@ -474,7 +475,7 @@ class DataParallel(layers.Layer): ...@@ -474,7 +475,7 @@ class DataParallel(layers.Layer):
strategy=None, strategy=None,
comm_buffer_size=25, comm_buffer_size=25,
last_comm_buffer_size=1, last_comm_buffer_size=1,
find_unused_parameters=True): find_unused_parameters=False):
super(DataParallel, super(DataParallel,
self).__init__(layers.full_name() + "_data_parallel") self).__init__(layers.full_name() + "_data_parallel")
...@@ -576,12 +577,8 @@ class DataParallel(layers.Layer): ...@@ -576,12 +577,8 @@ class DataParallel(layers.Layer):
def forward(self, *inputs, **kwargs): def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs) outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad: if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad:
if self.find_unused_parameters: self._reducer.prepare_for_backward(
self._reducer.prepare_for_backward( list(self._find_varbase(outputs)))
list(self._find_varbase(outputs)))
else:
self._reducer.prepare_for_backward(list(self._find_varbase([])))
return outputs return outputs
@deprecated( @deprecated(
......
...@@ -74,8 +74,8 @@ class TestDistTraning(unittest.TestCase): ...@@ -74,8 +74,8 @@ class TestDistTraning(unittest.TestCase):
state_dict = model_a.state_dict() state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict) model_b.set_state_dict(state_dict)
model_a = paddle.DataParallel(model_a) model_a = paddle.DataParallel(model_a, find_unused_parameters=True)
model_b = paddle.DataParallel(model_b) model_b = paddle.DataParallel(model_b, find_unused_parameters=True)
ones_input = paddle.ones(shape=(batch, in_dim)) ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True ones_input.stop_gradient = True
......
...@@ -27,6 +27,7 @@ from test_dist_base import RUN_STEP ...@@ -27,6 +27,7 @@ from test_dist_base import RUN_STEP
class SpawnAssistTestArgs(object): class SpawnAssistTestArgs(object):
update_method = "local" update_method = "local"
trainer_id = 0 trainer_id = 0
find_unused_parameters = False
class TestDistSpawnRunner(unittest.TestCase): class TestDistSpawnRunner(unittest.TestCase):
......
...@@ -548,7 +548,10 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -548,7 +548,10 @@ class TestParallelDyGraphRunnerBase(object):
# 4. train model # 4. train model
model, train_reader, opt = self.get_model() model, train_reader, opt = self.get_model()
if args.update_method == "nccl2": if args.update_method == "nccl2":
model = paddle.DataParallel(model) if args.find_unused_parameters:
model = paddle.DataParallel(model, find_unused_parameters=True)
else:
model = paddle.DataParallel(model, find_unused_parameters=False)
out_losses = [] out_losses = []
for step_id, data in enumerate(train_reader()): for step_id, data in enumerate(train_reader()):
...@@ -581,8 +584,8 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -581,8 +584,8 @@ class TestParallelDyGraphRunnerBase(object):
# set strategy # set strategy
strategy = fleet.DistributedStrategy() strategy = fleet.DistributedStrategy()
if not args.find_unused_parameters: if args.find_unused_parameters:
strategy.find_unused_parameters = False strategy.find_unused_parameters = True
# 3. init parallel env # 3. init parallel env
if args.update_method == "nccl2" or "bkcl": if args.update_method == "nccl2" or "bkcl":
...@@ -737,7 +740,7 @@ class TestDistBase(unittest.TestCase): ...@@ -737,7 +740,7 @@ class TestDistBase(unittest.TestCase):
self._save_model = False self._save_model = False
self._fuse_all_reduce = None self._fuse_all_reduce = None
self._accumulate_gradient = False self._accumulate_gradient = False
self._find_unused_parameters = True self._find_unused_parameters = False
self._setup_config() self._setup_config()
global DIST_UT_PORT global DIST_UT_PORT
......
...@@ -30,6 +30,7 @@ class TestDygraphControlFlowSame(TestDistBase): ...@@ -30,6 +30,7 @@ class TestDygraphControlFlowSame(TestDistBase):
self._sync_mode = False self._sync_mode = False
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._find_unused_parameters = True
def test_net(self): def test_net(self):
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
...@@ -46,6 +47,7 @@ class TestFleetDygraphControlFlowSame(TestDygraphControlFlowSame): ...@@ -46,6 +47,7 @@ class TestFleetDygraphControlFlowSame(TestDygraphControlFlowSame):
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._use_fleet_api = True self._use_fleet_api = True
self._find_unused_parameters = True
class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame): class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame):
...@@ -54,6 +56,7 @@ class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame): ...@@ -54,6 +56,7 @@ class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame):
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._accumulate_gradient = True self._accumulate_gradient = True
self._find_unused_parameters = True
class TestDygraphControlFlowDiff(TestDistBase): class TestDygraphControlFlowDiff(TestDistBase):
...@@ -61,6 +64,7 @@ class TestDygraphControlFlowDiff(TestDistBase): ...@@ -61,6 +64,7 @@ class TestDygraphControlFlowDiff(TestDistBase):
self._sync_mode = False self._sync_mode = False
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._find_unused_parameters = True
def test_net(self): def test_net(self):
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
...@@ -77,6 +81,7 @@ class TestFleetDygraphControlFlowDiff(TestDygraphControlFlowDiff): ...@@ -77,6 +81,7 @@ class TestFleetDygraphControlFlowDiff(TestDygraphControlFlowDiff):
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._use_fleet_api = True self._use_fleet_api = True
self._find_unused_parameters = True
class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff): class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff):
...@@ -85,6 +90,7 @@ class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff): ...@@ -85,6 +90,7 @@ class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff):
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._accumulate_gradient = True self._accumulate_gradient = True
self._find_unused_parameters = True
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -31,6 +31,7 @@ class TestParallelDygraphMnist(TestDistBase): ...@@ -31,6 +31,7 @@ class TestParallelDygraphMnist(TestDistBase):
self._sync_mode = False self._sync_mode = False
self._nccl2_mode = True self._nccl2_mode = True
self._dygraph = True self._dygraph = True
self._find_unused_parameters = True
def test_mnist(self): def test_mnist(self):
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册